Permalink
Browse files

set limit because jruby threadContextMap seems to be leaked.. if up t…

…o limit, re-create ruby engine.

ScriptingContainer Singleton mode still leaked. changed SingleThread to reset.
  • Loading branch information...
fujibee committed Jan 24, 2010
1 parent 09783da commit b997a91cc44e7193f5bc5610d77c07f4c53d4da9
View
@@ -9,7 +9,7 @@ Gem::Specification.new do |s|
s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Koichi Fujikawa"]
- s.date = %q{2010-01-21}
+ s.date = %q{2010-01-25}
s.default_executable = %q{joh}
s.description = %q{JRuby on Hadoop}
s.email = %q{fujibee@gmail.com}
View
Binary file not shown.
@@ -8,12 +8,18 @@
import javax.script.ScriptException;
import org.apache.hadoop.mapred.JobConf;
+import org.jruby.embed.LocalContextScope;
import org.jruby.embed.ScriptingContainer;
public class JRubyEvaluator {
private static final String WRAPPER_FILE_NAME = "ruby_wrapper.rb";
+ /** invoke count is limited so that memory leaking */
+ private static final int INVOKE_LIMIT = 10000;
+
+ private static int invokeCounter = 0;
+
private ScriptingContainer rubyEngine;
/** ruby script name kicked by Java at first */
@@ -23,35 +29,52 @@
private String dslFileName;
public JRubyEvaluator(JobConf conf) {
- rubyEngine = new ScriptingContainer();
- if (rubyEngine == null)
- throw new RuntimeException("cannot find jruby engine");
scriptFileName = conf.get("mapred.ruby.script");
dslFileName = conf.get("mapred.ruby.dslfile");
- try {
- // evaluate ruby library upfront
- rubyEngine.runScriptlet(readRubyWrapperFile(), WRAPPER_FILE_NAME);
- } catch (Exception e) {
- throw new RuntimeException("cannot find wrapper file", e);
- }
+
+ setupEngine();
}
public Object invoke(String methodName, Object conf) throws ScriptException {
- Object self = rubyEngine.get("self");
+ Object self = null; // if receiver is null, should use toplevel.
Object result = rubyEngine.callMethod(self, methodName, new Object[] {
conf, scriptFileName, dslFileName }, Object[].class);
+ invokeCounter++;
return result;
}
public Object invoke(String methodName, Object key, Object value,
Object output, Object reporter) throws ScriptException {
- Object self = rubyEngine.get("self");
+ Object self = null; // if receiver is null, should use toplevel.
Object result = rubyEngine.callMethod(self, methodName, new Object[] {
key, value, output, reporter, scriptFileName, dslFileName },
null);
+ invokeCounter++;
return result;
}
+ // check resouce and restart engine if over limit
+ public void checkResource() {
+ // now simply count because cannot check directly
+ // ThreadContextMap in ThreadService seems to be leaked
+ if (invokeCounter > INVOKE_LIMIT) {
+ invokeCounter = 0;
+ setupEngine();
+ }
+ }
+
+ private void setupEngine() {
+ rubyEngine = new ScriptingContainer(LocalContextScope.SINGLETHREAD);
+ if (rubyEngine == null)
+ throw new RuntimeException("cannot find jruby engine");
+ try {
+ // evaluate ruby library upfront
+ rubyEngine.runScriptlet(readRubyWrapperFile(), WRAPPER_FILE_NAME);
+ } catch (Exception e) {
+ throw new RuntimeException("cannot find wrapper file", e);
+ }
+ }
+
private Reader readRubyWrapperFile() throws FileNotFoundException {
InputStream is = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(WRAPPER_FILE_NAME);
@@ -24,6 +24,8 @@ public void map(LongWritable key, Text value,
evaluator.invoke("wrap_map", key, value, output, reporter);
} catch (ScriptException e) {
reporter.setStatus(e.getMessage());
+ } finally {
+ evaluator.checkResource();
}
}
}
@@ -24,6 +24,8 @@ public void reduce(Text key, Iterator<IntWritable> values,
evaluator.invoke("wrap_reduce", key, values, output, reporter);
} catch (ScriptException e) {
reporter.setStatus(e.getMessage());
+ } finally {
+ evaluator.checkResource();
}
}
}

0 comments on commit b997a91

Please sign in to comment.