Skip to content

Commit

Permalink
Initial implementation of ResourceWatcherService
Browse files Browse the repository at this point in the history
Closes #4062
  • Loading branch information
imotov committed Nov 4, 2013
1 parent 1b799e9 commit c724f0d
Show file tree
Hide file tree
Showing 10 changed files with 1,085 additions and 49 deletions.
12 changes: 12 additions & 0 deletions docs/reference/modules/scripting.asciidoc
Expand Up @@ -68,6 +68,18 @@ This will still allow execution of named scripts provided in the config, or
_native_ Java scripts registered through plugins, however it will prevent
users from running arbitrary scripts via the API.

[float]
=== Automatic Script Reloading

added[0.90.6]

The `config/scripts` directory is scanned periodically for changes.
New and changed scripts are reloaded and deleted script are removed
from preloaded scripts cache. The reload frequency can be specified
using `watcher.interval` setting, which defaults to `60s`.
To disable script reloading completely set `script.auto_reload_enabled`
to `false`.

[float]
=== Native (Java) Scripts

Expand Down
Expand Up @@ -90,6 +90,8 @@
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherModule;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -170,6 +172,7 @@ public InternalNode(Settings pSettings, boolean loadConfigSettings) throws Elast
modules.add(new BulkUdpModule());
modules.add(new ShapeModule());
modules.add(new PercolatorModule());
modules.add(new ResourceWatcherModule());

injector = modules.createInjector();

Expand Down Expand Up @@ -223,6 +226,7 @@ public Node start() {
injector.getInstance(HttpServer.class).start();
}
injector.getInstance(BulkUdpService.class).start();
injector.getInstance(ResourceWatcherService.class).start();

logger.info("started");

Expand All @@ -238,6 +242,7 @@ public Node stop() {
logger.info("stopping ...");

injector.getInstance(BulkUdpService.class).stop();
injector.getInstance(ResourceWatcherService.class).stop();
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).stop();
}
Expand Down
129 changes: 80 additions & 49 deletions src/main/java/org/elasticsearch/script/ScriptService.java
Expand Up @@ -23,9 +23,9 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
Expand All @@ -35,8 +35,10 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.script.mvel.MvelScriptEngineService;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.watcher.FileChangesListener;
import org.elasticsearch.watcher.FileWatcher;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.io.File;
import java.io.FileInputStream;
Expand All @@ -59,18 +61,13 @@ public class ScriptService extends AbstractComponent {
private final ConcurrentMap<String, CompiledScript> staticCache = ConcurrentCollections.newConcurrentMap();

private final Cache<CacheKey, CompiledScript> cache;
private final File scriptsDirectory;

private final boolean disableDynamic;

public ScriptService(Settings settings) {
this(settings, new Environment(), ImmutableSet.<ScriptEngineService>builder()
.add(new MvelScriptEngineService(settings))
.build()
);
}

@Inject
public ScriptService(Settings settings, Environment env, Set<ScriptEngineService> scriptEngines) {
public ScriptService(Settings settings, Environment env, Set<ScriptEngineService> scriptEngines,
ResourceWatcherService resourceWatcherService) {
super(settings);

int cacheMaxSize = componentSettings.getAsInt("cache.max_size", 500);
Expand Down Expand Up @@ -100,45 +97,17 @@ public ScriptService(Settings settings, Environment env, Set<ScriptEngineService
// put some default optimized scripts
staticCache.put("doc.score", new CompiledScript("native", new DocScoreNativeScriptFactory()));

// compile static scripts
File scriptsFile = new File(env.configFile(), "scripts");
if (scriptsFile.exists()) {
processScriptsDirectory("", scriptsFile);
}
}

private void processScriptsDirectory(String prefix, File dir) {
for (File file : dir.listFiles()) {
if (file.isDirectory()) {
processScriptsDirectory(prefix + file.getName() + "_", file);
} else {
int extIndex = file.getName().lastIndexOf('.');
if (extIndex != -1) {
String ext = file.getName().substring(extIndex + 1);
String scriptName = prefix + file.getName().substring(0, extIndex);
boolean found = false;
for (ScriptEngineService engineService : scriptEngines.values()) {
for (String s : engineService.extensions()) {
if (s.equals(ext)) {
found = true;
try {
String script = Streams.copyToString(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8));
staticCache.put(scriptName, new CompiledScript(engineService.types()[0], engineService.compile(script)));
} catch (Exception e) {
logger.warn("failed to load/compile script [{}]", e, scriptName);
}
break;
}
}
if (found) {
break;
}
}
if (!found) {
logger.warn("no script engine found for [{}]", ext);
}
}
}
// add file watcher for static scripts
scriptsDirectory = new File(env.configFile(), "scripts");
FileWatcher fileWatcher = new FileWatcher(scriptsDirectory);
fileWatcher.addListener(new ScriptChangesListener());

if (componentSettings.getAsBoolean("auto_reload_enabled", true)) {
// automatic reload is enabled - register scripts
resourceWatcherService.add(fileWatcher);
} else {
// automatic reload is disable just load scripts once
fileWatcher.init();
}
}

Expand Down Expand Up @@ -214,6 +183,68 @@ private boolean dynamicScriptDisabled(String lang) {
return !"native".equals(lang);
}

private class ScriptChangesListener extends FileChangesListener {

private Tuple<String, String> scriptNameExt(File file) {
String scriptPath = scriptsDirectory.toURI().relativize(file.toURI()).getPath();
int extIndex = scriptPath.lastIndexOf('.');
if (extIndex != -1) {
String ext = scriptPath.substring(extIndex + 1);
String scriptName = scriptPath.substring(0, extIndex).replace(File.separatorChar, '_');
return new Tuple<String, String>(scriptName, ext);
} else {
return null;
}
}

@Override
public void onFileInit(File file) {
Tuple<String, String> scriptNameExt = scriptNameExt(file);
if (scriptNameExt != null) {
boolean found = false;
for (ScriptEngineService engineService : scriptEngines.values()) {
for (String s : engineService.extensions()) {
if (s.equals(scriptNameExt.v2())) {
found = true;
try {
logger.trace("compiling script file " + file.getAbsolutePath());
String script = Streams.copyToString(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8));
staticCache.put(scriptNameExt.v1(), new CompiledScript(engineService.types()[0], engineService.compile(script)));
} catch (Throwable e) {
logger.warn("failed to load/compile script [{}]", e, scriptNameExt.v1());
}
break;
}
}
if (found) {
break;
}
}
if (!found) {
logger.warn("no script engine found for [{}]", scriptNameExt.v2());
}
}
}

@Override
public void onFileCreated(File file) {
onFileInit(file);
}

@Override
public void onFileDeleted(File file) {
Tuple<String, String> scriptNameExt = scriptNameExt(file);
logger.trace("removing script file " + file.getAbsolutePath());
staticCache.remove(scriptNameExt.v1());
}

@Override
public void onFileChanged(File file) {
onFileInit(file);
}

}

public static class CacheKey {
public final String lang;
public final String script;
Expand Down
@@ -0,0 +1,79 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.watcher;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* Abstract resource watcher framework, which handles adding and removing listeners
* and calling resource observer.
*/
public abstract class AbstractResourceWatcher<Listener> implements ResourceWatcher {
private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
private boolean initialized = false;

@Override
public void init() {
if (!initialized) {
doInit();
initialized = true;
}
}

@Override
public void checkAndNotify() {
init();
doCheckAndNotify();
}

/**
* Registers new listener
*/
public void addListener(Listener listener) {
listeners.add(listener);
}

/**
* Unregisters a listener
*/
public void remove(Listener listener) {
listeners.remove(listener);
}

/**
* Returns a list of listeners
*/
protected List<Listener> listeners() {
return listeners;
}

/**
* Will be called once on initialization
*/
protected abstract void doInit();

/**
* Will be called periodically
* <p/>
* Implementing watcher should check resource and notify all {@link #listeners()}.
*/
protected abstract void doCheckAndNotify();

}
75 changes: 75 additions & 0 deletions src/main/java/org/elasticsearch/watcher/FileChangesListener.java
@@ -0,0 +1,75 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.watcher;

import java.io.File;

/**
* Callback interface that file changes File Watcher is using to notify listeners about changes.
*/
public class FileChangesListener {
/**
* Called for every file found in the watched directory during initialization
*/
public void onFileInit(File file) {

}

/**
* Called for every subdirectory found in the watched directory during initialization
*/
public void onDirectoryInit(File file) {

}

/**
* Called for every new file found in the watched directory
*/
public void onFileCreated(File file) {

}

/**
* Called for every file that disappeared in the watched directory
*/
public void onFileDeleted(File file) {

}

/**
* Called for every file that was changed in the watched directory
*/
public void onFileChanged(File file) {

}

/**
* Called for every new subdirectory found in the watched directory
*/
public void onDirectoryCreated(File file) {

}

/**
* Called for every file that disappeared in the watched directory
*/
public void onDirectoryDeleted(File file) {

}
}

0 comments on commit c724f0d

Please sign in to comment.