Skip to content

Commit

Permalink
Core: upgrade to current Lucene 5.0.0 snapshot
Browse files Browse the repository at this point in the history
Elasticsearch no longer unlocks the Lucene index on startup (this was
dangerous, and could possibly lead to corruption).

Added the new serbian_normalization TokenFilter from Lucene.

NoLockFactory is no longer supported (index.store.fs.fs_lock = none),
and if you have a typo in your fs_lock you'll now hit a StoreException
instead of silently using NoLockFactory.

Closes #8588
  • Loading branch information
mikemccand committed Nov 24, 2014
1 parent 866571f commit dfb6d60
Show file tree
Hide file tree
Showing 19 changed files with 129 additions and 125 deletions.
Expand Up @@ -34,3 +34,7 @@ Scandinavian::
http://lucene.apache.org/core/4_9_0/analyzers-common/org/apache/lucene/analysis/miscellaneous/ScandinavianNormalizationFilter.html[`scandinavian_normalization`],
http://lucene.apache.org/core/4_9_0/analyzers-common/org/apache/lucene/analysis/miscellaneous/ScandinavianFoldingFilter.html[`scandinavian_folding`]

Serbian::

not-released-yet[`serbian_normalization`],

4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -32,7 +32,7 @@

<properties>
<lucene.version>5.0.0</lucene.version>
<lucene.maven.version>5.0.0-snapshot-1637347</lucene.maven.version>
<lucene.maven.version>5.0.0-snapshot-1641343</lucene.maven.version>
<tests.jvms>auto</tests.jvms>
<tests.shuffle>true</tests.shuffle>
<tests.output>onerror</tests.output>
Expand All @@ -54,7 +54,7 @@
</repository>
<repository>
<id>Lucene snapshots</id>
<url>https://download.elasticsearch.org/lucenesnapshots/1637347</url>
<url>https://download.elasticsearch.org/lucenesnapshots/1641343</url>
</repository>
</repositories>

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/org/elasticsearch/env/NodeEnvironment.java
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
Expand Down Expand Up @@ -82,10 +84,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
if (Files.exists(dir) == false) {
Files.createDirectories(dir);
}
logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
try {
NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir);
Lock tmpLock = lockFactory.makeLock("node.lock");

try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) {
logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
Lock tmpLock = luceneDir.makeLock("node.lock");
boolean obtained = tmpLock.obtain();
if (obtained) {
locks[dirIndex] = tmpLock;
Expand Down
Expand Up @@ -511,6 +511,7 @@ public void processTokenFilters(TokenFiltersBindings tokenFiltersBindings) {
tokenFiltersBindings.processTokenFilter("persian_normalization", PersianNormalizationFilterFactory.class);
tokenFiltersBindings.processTokenFilter("scandinavian_normalization", ScandinavianNormalizationFilterFactory.class);
tokenFiltersBindings.processTokenFilter("scandinavian_folding", ScandinavianFoldingFilterFactory.class);
tokenFiltersBindings.processTokenFilter("serbian_normalization", SerbianNormalizationFilterFactory.class);

tokenFiltersBindings.processTokenFilter("hunspell", HunspellTokenFilterFactory.class);
tokenFiltersBindings.processTokenFilter("cjk_bigram", CJKBigramFilterFactory.class);
Expand Down
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.analysis;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.sr.SerbianNormalizationFilter;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.assistedinject.Assisted;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
/**
*
*/
public class SerbianNormalizationFilterFactory extends AbstractTokenFilterFactory {

@Inject
public SerbianNormalizationFilterFactory(Index index, @IndexSettings Settings indexSettings, @Assisted String name, @Assisted Settings settings) {
super(index, indexSettings, name, settings);
}

@Override
public TokenStream create(TokenStream tokenStream) {
return new SerbianNormalizationFilter(tokenStream);
}
}
Expand Up @@ -1434,11 +1434,6 @@ private static boolean isMergedSegment(LeafReader reader) {

private IndexWriter createWriter() throws IOException {
try {
// release locks when started
if (IndexWriter.isLocked(store.directory())) {
logger.warn("shard is locked, releasing lock");
IndexWriter.unlock(store.directory());
}
boolean create = !Lucene.indexExists(store.directory());
IndexWriterConfig config = new IndexWriterConfig(analysisService.defaultIndexAnalyzer());
config.setCommitOnClose(false); // we by default don't commit on close
Expand Down
Expand Up @@ -264,7 +264,7 @@ public static class CustomBinaryDocValuesField extends NumberFieldMapper.CustomN

public static final FieldType TYPE = new FieldType();
static {
TYPE.setDocValueType(DocValuesType.BINARY);
TYPE.setDocValuesType(DocValuesType.BINARY);
TYPE.freeze();
}

Expand Down
Expand Up @@ -404,7 +404,7 @@ public static class CustomDoubleNumericDocValuesField extends CustomNumericDocVa

public static final FieldType TYPE = new FieldType();
static {
TYPE.setDocValueType(DocValuesType.BINARY);
TYPE.setDocValuesType(DocValuesType.BINARY);
TYPE.freeze();
}

Expand Down
Expand Up @@ -409,7 +409,7 @@ public static class CustomFloatNumericDocValuesField extends CustomNumericDocVal

public static final FieldType TYPE = new FieldType();
static {
TYPE.setDocValueType(DocValuesType.BINARY);
TYPE.setDocValuesType(DocValuesType.BINARY);
TYPE.freeze();
}

Expand Down
Expand Up @@ -433,7 +433,7 @@ public static abstract class CustomNumericDocValuesField implements IndexableFie

public static final FieldType TYPE = new FieldType();
static {
TYPE.setDocValueType(DocValuesType.BINARY);
TYPE.setDocValuesType(DocValuesType.BINARY);
TYPE.freeze();
}

Expand Down Expand Up @@ -484,7 +484,7 @@ public static class CustomLongNumericDocValuesField extends CustomNumericDocValu

public static final FieldType TYPE = new FieldType();
static {
TYPE.setDocValueType(DocValuesType.BINARY);
TYPE.setDocValuesType(DocValuesType.BINARY);
TYPE.freeze();
}

Expand Down
Expand Up @@ -728,7 +728,7 @@ public static class CustomGeoPointDocValuesField extends CustomNumericDocValuesF

public static final FieldType TYPE = new FieldType();
static {
TYPE.setDocValueType(DocValuesType.BINARY);
TYPE.setDocValuesType(DocValuesType.BINARY);
TYPE.freeze();
}

Expand Down
Expand Up @@ -151,6 +151,11 @@ protected void afterMerge(OnGoingMerge merge) {
super.afterMerge(merge);
provider.afterMerge(merge);
}

@Override
protected void maybeStall() {
// Don't stall here, because we do our own index throttling (in InternalEngine.IndexThrottle) when merges can't keep up
}
}

class ApplySettings implements IndexSettingsService.Listener {
Expand Down
126 changes: 34 additions & 92 deletions src/main/java/org/elasticsearch/index/store/DistributorDirectory.java
Expand Up @@ -33,7 +33,7 @@
* A directory implementation that uses the Elasticsearch {@link Distributor} abstraction to distribute
* files across multiple data directories.
*/
public final class DistributorDirectory extends BaseDirectory {
public final class DistributorDirectory extends Directory {

private final Distributor distributor;
private final HashMap<String, Directory> nameDirMapping = new HashMap<>();
Expand Down Expand Up @@ -74,7 +74,6 @@ public DistributorDirectory(Distributor distributor) throws IOException {
nameDirMapping.put(file, dir);
}
}
lockFactory = new DistributorLockFactoryWrapper(distributor.primary());
}

@Override
Expand Down Expand Up @@ -132,23 +131,22 @@ public synchronized void close() throws IOException {
} finally {
IOUtils.close(distributor.all());
}

}

/**
* Returns the directory that has previously been associated with this file name.
*
* @throws IOException if the name has not yet been associated with any directory ie. fi the file does not exists
*/
Directory getDirectory(String name) throws IOException { // pkg private for testing
synchronized Directory getDirectory(String name) throws IOException { // pkg private for testing
return getDirectory(name, true);
}

/**
* Returns the directory that has previously been associated with this file name or associates the name with a directory
* if failIfNotAssociated is set to false.
*/
private Directory getDirectory(String name, boolean failIfNotAssociated) throws IOException {
private synchronized Directory getDirectory(String name, boolean failIfNotAssociated) throws IOException {
final Directory directory = nameDirMapping.get(name);
if (directory == null) {
if (failIfNotAssociated) {
Expand All @@ -164,17 +162,6 @@ private Directory getDirectory(String name, boolean failIfNotAssociated) throws
return directory;
}

@Override
public synchronized void setLockFactory(LockFactory lockFactory) throws IOException {
distributor.primary().setLockFactory(lockFactory);
super.setLockFactory(new DistributorLockFactoryWrapper(distributor.primary()));
}

@Override
public synchronized String getLockID() {
return distributor.primary().getLockID();
}

@Override
public synchronized String toString() {
return distributor.toString();
Expand All @@ -201,8 +188,8 @@ private synchronized boolean assertConsistency() throws IOException {
.append(System.lineSeparator());
} else if (directory != d) {
consistent = false;
builder.append("File ").append(file).append(" was mapped to a directory ").append(directory)
.append(" but exists in another distributor directory").append(d)
builder.append("File ").append(file).append(" was mapped to a directory ").append(directory)
.append(" but exists in another distributor directory ").append(d)
.append(System.lineSeparator());
}

Expand All @@ -212,86 +199,41 @@ private synchronized boolean assertConsistency() throws IOException {
return consistent; // return boolean so it can be easily be used in asserts
}

/**
* This inner class is a simple wrapper around the original
* lock factory to track files written / created through the
* lock factory. For instance {@link NativeFSLockFactory} creates real
* files that we should expose for consistency reasons.
*/
private class DistributorLockFactoryWrapper extends LockFactory {
private final Directory dir;
private final LockFactory delegate;
private final boolean writesFiles;

public DistributorLockFactoryWrapper(Directory dir) {
this.dir = dir;
final FSDirectory leaf = DirectoryUtils.getLeaf(dir, FSDirectory.class);
if (leaf != null) {
writesFiles = leaf.getLockFactory() instanceof FSLockFactory;
} else {
writesFiles = false;
}
this.delegate = dir.getLockFactory();
}

@Override
public void setLockPrefix(String lockPrefix) {
delegate.setLockPrefix(lockPrefix);
}

@Override
public String getLockPrefix() {
return delegate.getLockPrefix();
}

@Override
public Lock makeLock(String lockName) {
return new DistributorLock(delegate.makeLock(lockName), lockName);
}

@Override
public void clearLock(String lockName) throws IOException {
delegate.clearLock(lockName);
}

@Override
public String toString() {
return "DistributorLockFactoryWrapper(" + delegate.toString() + ")";
}

private class DistributorLock extends Lock {
private final Lock delegateLock;
private final String name;

DistributorLock(Lock delegate, String name) {
this.delegateLock = delegate;
this.name = name;
}

@Override
public boolean obtain() throws IOException {
if (delegateLock.obtain()) {
if (writesFiles) {
synchronized (DistributorDirectory.this) {
assert (nameDirMapping.containsKey(name) == false || nameDirMapping.get(name) == dir);
if (nameDirMapping.get(name) == null) {
nameDirMapping.put(name, dir);
@Override
public Lock makeLock(final String lockName) {
final Directory primary = distributor.primary();
final Lock delegateLock = primary.makeLock(lockName);
if (DirectoryUtils.getLeaf(primary, FSDirectory.class) != null) {
// Wrap the delegate's lock just so we can monitor when it actually wrote a lock file. We assume that an FSDirectory writes its
// locks as actual files (we don't support NoLockFactory):
return new Lock() {
@Override
public boolean obtain() throws IOException {
if (delegateLock.obtain()) {
synchronized(DistributorDirectory.this) {
assert nameDirMapping.containsKey(lockName) == false || nameDirMapping.get(lockName) == primary;
if (nameDirMapping.get(lockName) == null) {
nameDirMapping.put(lockName, primary);
}
}
return true;
} else {
return false;
}
return true;
} else {
return false;
}
}

@Override
public void close() throws IOException { delegateLock.close(); }
@Override
public void close() throws IOException {
delegateLock.close();
}

@Override
public boolean isLocked() throws IOException {
return delegateLock.isLocked();
}
@Override
public boolean isLocked() throws IOException {
return delegateLock.isLocked();
}
};
} else {
return delegateLock;
}
}
}

0 comments on commit dfb6d60

Please sign in to comment.