Skip to content

Commit

Permalink
[ZEPPELIN-4504]. Fix searching of keywords in notebooks
Browse files Browse the repository at this point in the history
  • Loading branch information
zjffdu committed Mar 9, 2020
1 parent 932ff58 commit 0242387
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 35 deletions.
28 changes: 23 additions & 5 deletions conf/zeppelin-site.xml.template
Expand Up @@ -651,10 +651,28 @@
<description>Kubernetes yaml spec files</description>
</property>

<property>
<name>zeppelin.docker.container.image</name>
<value>apache/zeppelin:0.8.0</value>
<description>Docker image for interpreters</description>
</property>
<property>
<name>zeppelin.docker.container.image</name>
<value>apache/zeppelin:0.8.0</value>
<description>Docker image for interpreters</description>
</property>

<property>
<name>zeppelin.search.index.rebuild</name>
<value>false</value>
<description>Whether rebuild index when zeppelin start. If true, it would read all notes and rebuild the index, this would consume lots of memory if you have large amounts of notes, so by default it is false</description>
</property>

<property>
<name>zeppelin.search.use.disk</name>
<value>true</value>
<description>Whether using disk for storing search index, if false, memory will be used instead.</description>
</property>

<property>
<name>zeppelin.search.index.path</name>
<value>/tmp/zeppelin-index</value>
<description>path for storing search index on disk.</description>
</property>

</configuration>
Expand Up @@ -707,12 +707,16 @@ public String getZeppelinProxyPassword() {
return getString(ConfVars.ZEPPELIN_PROXY_PASSWORD);
}

public Boolean isIndexRebuild() {
return getBoolean(ConfVars.ZEPPELIN_SEARCH_INDEX_REBUILD);
}

public Boolean isZeppelinSearchUseDisk() {
return getBoolean(ConfVars.ZEPPELIN_SEARCH_USE_DISK);
}

public String getZeppelinSearchTempPath() {
return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_TEMP_PATH);
return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_INDEX_PATH);
}

public String getClusterAddress() {
Expand Down Expand Up @@ -967,8 +971,9 @@ public enum ConfVars {
ZEPPELIN_PROXY_URL("zeppelin.proxy.url", null),
ZEPPELIN_PROXY_USER("zeppelin.proxy.user", null),
ZEPPELIN_PROXY_PASSWORD("zeppelin.proxy.password", null),
ZEPPELIN_SEARCH_USE_DISK("zeppelin.search.use.disk", false),
ZEPPELIN_SEARCH_TEMP_PATH("zeppelin.search.temp.path", System.getProperty("java.io.tmpdir"));
ZEPPELIN_SEARCH_INDEX_REBUILD("zeppelin.search.index.rebuild", false),
ZEPPELIN_SEARCH_USE_DISK("zeppelin.search.use.disk", true),
ZEPPELIN_SEARCH_INDEX_PATH("zeppelin.search.index.path", "/tmp/zeppelin-index");

private String varName;
@SuppressWarnings("rawtypes")
Expand Down
Expand Up @@ -100,9 +100,12 @@ public Notebook(
this.interpreterSettingManager.setNotebook(this);
this.noteSearchService = noteSearchService;
this.credentials = credentials;

this.noteEventListeners.add(this.noteSearchService);
this.noteEventListeners.add(this.interpreterSettingManager);

if (conf.isIndexRebuild()) {
noteSearchService.startRebuildIndex(getAllNotes());
}
}

@Inject
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
Expand All @@ -30,7 +29,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.commons.io.FileUtils;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
Expand All @@ -57,7 +56,7 @@
import org.apache.lucene.search.highlight.TextFragment;
import org.apache.lucene.search.highlight.TokenSources;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
Expand All @@ -78,34 +77,32 @@ public class LuceneSearch extends SearchService {
private static final String PARAGRAPH = "paragraph";
private static final String ID_FIELD = "id";

private final ZeppelinConfiguration zeppelinConfiguration;
private Directory directory;
private Path directoryPath;
private Path indexPath;
private Directory indexDirectory;
private Analyzer analyzer;
private IndexWriterConfig indexWriterConfig;
private IndexWriter indexWriter;

@Inject
public LuceneSearch(ZeppelinConfiguration zeppelinConfiguration) {
public LuceneSearch(ZeppelinConfiguration conf) {
super("LuceneSearch-Thread");
this.zeppelinConfiguration = zeppelinConfiguration;
if (zeppelinConfiguration.isZeppelinSearchUseDisk()) {

if (conf.isZeppelinSearchUseDisk()) {
try {
this.directoryPath =
Files.createTempDirectory(
Paths.get(zeppelinConfiguration.getZeppelinSearchTempPath()), "zeppelin-search-");
this.directory = new MMapDirectory(directoryPath);
this.indexPath = Paths.get(conf.getZeppelinSearchTempPath());
this.indexDirectory = FSDirectory.open(indexPath);
logger.info("Use {} for storing lucene search index", this.indexPath);
} catch (IOException e) {
throw new RuntimeException(
"Failed to create temporary directory for search service. Use memory instead", e);
"Failed to create index directory for search service. Use memory instead", e);
}
} else {
this.directory = new RAMDirectory();
this.indexDirectory = new RAMDirectory();
}
this.analyzer = new StandardAnalyzer();
this.indexWriterConfig = new IndexWriterConfig(analyzer);
try {
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
this.indexWriter = new IndexWriter(indexDirectory, indexWriterConfig);
} catch (IOException e) {
logger.error("Failed to create new IndexWriter", e);
}
Expand All @@ -116,12 +113,12 @@ public LuceneSearch(ZeppelinConfiguration zeppelinConfiguration) {
*/
@Override
public List<Map<String, String>> query(String queryStr) {
if (null == directory) {
if (null == indexDirectory) {
throw new IllegalStateException(
"Something went wrong on instance creation time, index dir is null");
}
List<Map<String, String>> result = Collections.emptyList();
try (IndexReader indexReader = DirectoryReader.open(directory)) {
try (IndexReader indexReader = DirectoryReader.open(indexDirectory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Analyzer analyzer = new StandardAnalyzer();
MultiFieldQueryParser parser =
Expand All @@ -135,7 +132,7 @@ public List<Map<String, String>> query(String queryStr) {

result = doSearch(indexSearcher, query, analyzer, highlighter);
} catch (IOException e) {
logger.error("Failed to open index dir {}, make sure indexing finished OK", directory, e);
logger.error("Failed to open index dir {}, make sure indexing finished OK", indexDirectory, e);
} catch (ParseException e) {
logger.error("Failed to parse query " + queryStr, e);
}
Expand Down Expand Up @@ -396,9 +393,6 @@ private void deleteDoc(String noteId, Paragraph p) {
public void close() {
try {
indexWriter.close();
if (zeppelinConfiguration.isZeppelinNotebookCronEnable() && null != directoryPath) {
FileUtils.deleteDirectory(directoryPath.toFile());
}
} catch (IOException e) {
logger.error("Failed to .close() the notebook index", e);
}
Expand All @@ -425,4 +419,17 @@ private void indexDoc(IndexWriter w, String noteId, String noteName, Paragraph p
Document doc = newDocument(id, noteName, p);
w.addDocument(doc);
}

@Override
public void startRebuildIndex(List<Note> notes) {
Thread thread = new Thread(() -> {
logger.info("Starting rebuild index");
for (Note note: notes) {
addIndexDoc(note);
}
logger.info("Finish rebuild index");
});
thread.setName("LuceneSearch-RebuildIndex-Thread");
thread.start();
}
}
Expand Up @@ -133,4 +133,6 @@ public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent
e.printStackTrace();
}
}

public abstract void startRebuildIndex(List<Note> notes);
}
Expand Up @@ -22,9 +22,13 @@
import static org.mockito.Mockito.when;

import com.google.common.base.Splitter;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import com.google.common.io.Files;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterSetting;
Expand All @@ -40,17 +44,18 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.quartz.SchedulerException;

public class LuceneSearchTest {

private Notebook notebook;
private InterpreterSettingManager interpreterSettingManager;
private SearchService noteSearchService;

private LuceneSearch noteSearchService;
private File indexDir;

@Before
public void startUp() throws IOException, SchedulerException {
public void startUp() throws IOException {
indexDir = Files.createTempDir().getAbsoluteFile();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SEARCH_INDEX_PATH.getVarName(), indexDir.getAbsolutePath());
noteSearchService = new LuceneSearch(ZeppelinConfiguration.create());
interpreterSettingManager = mock(InterpreterSettingManager.class);
InterpreterSetting defaultInterpreterSetting = mock(InterpreterSetting.class);
Expand All @@ -65,6 +70,7 @@ public void startUp() throws IOException, SchedulerException {
@After
public void shutDown() {
noteSearchService.close();
indexDir.delete();
}

// @Test
Expand Down

0 comments on commit 0242387

Please sign in to comment.