<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array">
    <added>
      <filename>pydisco/disco/comm.py</filename>
    </added>
    <added>
      <filename>test/test_partfile.py</filename>
    </added>
  </added>
  <modified type="array">
    <modified>
      <diff>@@ -9,4 +9,4 @@ DISCO_USER=$USER
 DISCO_LOG=$DISCO
 DISCO_PID_DIR=$DISCO
 DISCO_MASTER_PORT=7000
-
+#DISCO_FLAGS=resultfs</diff>
      <filename>conf/disco.conf.dev-example</filename>
    </modified>
    <modified>
      <diff>@@ -1,12 +1,12 @@
 
 -module(disco_config).
--export([get_config_table/0, save_config_table/1]).
+-export([get_config_table/0, save_config_table/1, expand_range/2]).
 
 config_file() -&gt;
         {ok, Val} = application:get_env(disco_config),
         Val.
 
-add_nodes([FirstNode, Max], Instances) -&gt;
+expand_range(FirstNode, Max) -&gt;
         Len = string:len(FirstNode),
         FieldLen = string:len(Max),
         MaxNum = list_to_integer(Max),
@@ -14,9 +14,11 @@ add_nodes([FirstNode, Max], Instances) -&gt;
         MinNum = list_to_integer(
                 string:sub_string(FirstNode, Len - FieldLen + 1)),
         Format = lists:flatten(io_lib:format(&quot;~s~~.~w.0w&quot;, [Name, FieldLen])),
+        [lists:flatten(io_lib:fwrite(Format, [I])) ||
+                I &lt;- lists:seq(MinNum, MaxNum)].
 
-        [{lists:flatten(io_lib:fwrite(Format, [I])), Instances} ||
-                I &lt;- lists:seq(MinNum, MaxNum)];
+add_nodes([FirstNode, Max], Instances) -&gt;
+        [{N, Instances} || N &lt;- expand_range(FirstNode, Max)];
 
 add_nodes([Node], Instances) -&gt; {Node, Instances}.
         </diff>
      <filename>master/src/disco_config.erl</filename>
    </modified>
    <modified>
      <diff>@@ -177,8 +177,8 @@ handle_call({purge_job, JobName}, From, State) -&gt;
                 {ok, Root} = application:get_env(disco_root),
                 handle_call({clean_job, JobName}, From, State),
                 Nodes = [lists:flatten(
-                        [&quot;dir://&quot;, Node, &quot;/map/&quot;, 
-                        Node, &quot;/&quot;, jobhome(JobName)]) ||
+                        [&quot;dir://&quot;, Node, &quot;/&quot;, Node, &quot;/&quot;,
+                                jobhome(JobName), &quot;/null&quot;]) ||
                                 {Node, _} &lt;- ets:tab2list(node_load)],
                 garbage_collect:remove_job(Nodes),
                 garbage_collect:remove_dir(filename:join([Root, jobhome(JobName)]))</diff>
      <filename>master/src/disco_server.erl</filename>
    </modified>
    <modified>
      <diff>@@ -1,16 +1,18 @@
 
 -module(garbage_collect).
--export([remove_map_results/1, remove_job/1, remove_dir/1]).
+-export([remove_map_results/1, remove_job/1, remove_dir/1, move_results/1]).
+
+-define(RESULTFS_TIMEOUT, 300000). % 5min
 
 spawn_remote([], _) -&gt; ok;
 spawn_remote([Url|Urls], F) when is_binary(Url) -&gt;
         spawn_remote([binary_to_list(Url)|Urls], F);
 spawn_remote([Url|Urls], F) when is_list(Url) -&gt;
-        [&quot;dir:&quot;, Node, _|S0] = string:tokens(Url, &quot;/&quot;),
-        S = filename:join(S0),
+        [&quot;dir:&quot;, Node, _, Pref, JobName, File] = string:tokens(Url, &quot;/&quot;),
+        JobRoot = filename:join([Node, Pref, JobName]),
         SName = disco_worker:slave_name(Node),
         case net_adm:ping(SName) of 
-                pong -&gt; spawn(SName, fun () -&gt; F(S, Url) end);
+                pong -&gt; spawn(SName, fun () -&gt; F(File, JobRoot, Node) end);
                 _ -&gt; ok
         end,
         spawn_remote(Urls, F).
@@ -21,18 +23,62 @@ spawn_remote([Url|Urls], F) when is_list(Url) -&gt;
 remove_dir(Dir) -&gt;
         spawn(fun() -&gt; os:cmd(&quot;rm -Rf &quot; ++ Dir) end).
 
-remove_map_results(Urls) -&gt;
-        spawn_remote(Urls, fun (JobName, Url) -&gt;
+remove_job(Urls) -&gt;
+        spawn_remote(Urls, fun (_, JobRoot, Node) -&gt;
                 Root = os:getenv(&quot;DISCO_ROOT&quot;),
-                error_logger:info_report({&quot;Deleting map results at&quot;, Url}),
-                os:cmd(&quot;rm -Rf &quot; ++
-                        filename:join([Root, &quot;data&quot;, JobName, &quot;map-*&quot;]))
+                error_logger:info_report({&quot;Deleting all job files at&quot;, Node, JobRoot}),
+                remove_dir(filename:join([Root, &quot;data&quot;, JobRoot])),
+                remove_dir(filename:join([Root, &quot;temp&quot;, JobRoot]))
         end).
 
-remove_job(Urls) -&gt;
-        spawn_remote(Urls, fun (JobName, Url) -&gt;
+remove_map_results(Urls) -&gt;
+        spawn_remote(Urls, fun (File, JobRoot, _) -&gt;
                 Root = os:getenv(&quot;DISCO_ROOT&quot;),
-                error_logger:info_report({&quot;Deleting all job files at&quot;, Url}),
-                remove_dir(filename:join([Root, &quot;data&quot;, JobName]))
+                lists:foreach(fun(F) -&gt;
+                        C = &quot;rm -Rf &quot; ++
+                                filename:join([Root, &quot;data&quot;, JobRoot, F]),
+                        os:cmd(C)
+                end, parse_dir(File, JobRoot))
         end).
-        
+
+move_results(Urls) -&gt;
+        Parent = self(),
+        spawn_remote(Urls, fun (File, JobRoot, Node) -&gt;
+                Root = os:getenv(&quot;DISCO_ROOT&quot;),
+                Src = filename:join([Root, &quot;temp&quot;, JobRoot]),
+                Dst = filename:join([Root, &quot;data&quot;, JobRoot]),
+                os:cmd(&quot;mkdir -p &quot; ++ Dst ++ &quot;/oob&quot;),
+                lists:foreach(fun(F) -&gt;
+                        SrcF = filename:join(Src, F),
+                        X = os:cmd(&quot;mv &quot; ++ SrcF ++ &quot; &quot; ++ Dst),
+                        if X =/= [] -&gt;
+                                Parent ! {resultfs_error, Node, X};
+                        true -&gt; ok
+                        end
+                end, parse_dir(File, JobRoot)),
+                os:cmd(&quot;mv &quot; ++ Src ++ &quot;/oob/* &quot; ++ Dst ++ &quot;/oob&quot;),
+                Parent ! {resultfs_node_ok, Node}
+        end),
+        wait_nodes(length(Urls)).
+
+wait_nodes(0) -&gt; ok;
+wait_nodes(N) -&gt;
+        receive
+                {resultfs_node_ok, _} -&gt;
+                        wait_nodes(N - 1);
+                {resultfs_error, Node, Error} -&gt;
+                        {error, Node, Error};
+                _ -&gt;
+                        wait_nodes(N)
+        after ?RESULTFS_TIMEOUT -&gt;
+                timeout
+        end.
+
+parse_dir(File, JobRoot) -&gt;
+        Root = os:getenv(&quot;DISCO_ROOT&quot;),
+        case string:tokens(File, &quot;:&quot;) of
+                [B, N] -&gt; disco_config:expand_range(B, N);
+                [Index] -&gt;
+                        IFile = filename:join([Root, &quot;temp&quot;, JobRoot, File]),
+                        [Index|string:tokens(os:cmd(&quot;cat &quot; ++ IFile), &quot;\n&quot;)]
+        end.</diff>
      <filename>master/src/garbage_collect.erl</filename>
    </modified>
    <modified>
      <diff>@@ -225,6 +225,37 @@ check_failure_rate(Name, PartID, Mode, L) -&gt;
                 ok
         end.
 
+kill_job(Name, Msg, P, Type) -&gt;
+        event_server:event(Name, Msg, P, []),
+        gen_server:call(disco_server, {kill_job, Name}),
+        gen_server:cast(event_server, {flush_events, Name}),
+        exit(Type).
+
+resultfs_enabled() -&gt;
+    S = case os:getenv(&quot;DISCO_FLAGS&quot;) of
+            false -&gt; [];
+            X -&gt; string:tokens(string:to_lower(X), &quot; &quot;)
+    end,
+    [ 1 || &quot;resultfs&quot; &lt;- S] =/= [].
+
+move_to_resultfs(_, _, false) -&gt; ok;
+move_to_resultfs(Name, R, _) -&gt;
+        event_server:event(Name, &quot;Moving results to resultfs&quot;, [], []),
+        case catch garbage_collect:move_results(R) of
+                ok -&gt; ok;
+                {error, Node, Error} -&gt;
+                        kill_job(Name,
+                        &quot;ERROR: Moving to resultfs failed on ~s: ~s&quot;,
+                                [Node, Error], logged_error);
+                timeout -&gt;
+                        kill_job(Name,
+                        &quot;ERROR: Moving to resultfs failed (timeout)&quot;,
+                                [], logged_error);
+                Error -&gt;
+                        kill_job(Name, 
+                        &quot;ERROR: Moving to resultfs failed: ~p&quot;, 
+                                [Error], unknown_error)
+        end.
 
 % run_task() is a common supervisor for both the map and reduce tasks.
 % Its main function is to catch and report any errors that occur during
@@ -239,21 +270,17 @@ run_task(Inputs, Mode, Name, MaxN) -&gt;
         case catch work(Inputs, Mode, Name, 0, MaxN, {Results, Failures}) of
                 ok -&gt; ok;
                 logged_error -&gt;
-                        event_server:event(Name, 
+                        kill_job(Name, 
                         &quot;ERROR: Job terminated due to the previous errors&quot;,
-                                [], []),
-                        gen_server:call(disco_server, {kill_job, Name}),
-                        gen_server:cast(event_server, {flush_events, Name}),
-                        exit(logged_error);
+                                [], logged_error);
                 Error -&gt;
-                        event_server:event(Name, 
+                        kill_job(Name, 
                         &quot;ERROR: Job coordinator failed unexpectedly: ~p&quot;, 
-                                [Error], []),
-                        gen_server:call(disco_server, {kill_job, Name}),
-                        gen_server:cast(event_server, {flush_events, Name}),
-                        exit(unknown_error)
+                                [Error], unknown_error)
         end,
+
         R = [list_to_binary(X) || {X, _} &lt;- ets:tab2list(Results)],
+        move_to_resultfs(Name, R, resultfs_enabled()),
         ets:delete(Results),
         ets:delete(Failures),
         R.</diff>
      <filename>master/src/handle_job.erl</filename>
    </modified>
    <modified>
      <diff>@@ -2,16 +2,11 @@
 import os, sys
 
 from disconode import disco_worker as dw
-from disconode import external
-from disconode.util import ensure_path 
-
-from disconode.disco_worker import\
-        HTTP_PORT, LOCAL_PATH, PARAMS_FILE, EXT_MAP, EXT_REDUCE,\
-        MAP_OUTPUT, CHUNK_OUTPUT, REDUCE_DL, REDUCE_SORTED, REDUCE_OUTPUT,\
-        OOB_FILE
+from disconode.util import ensure_path, ensure_file 
 
+from disconode.disco_worker import JOB_ROOT, OOB_FILE, PARAMS_FILE
 from disco.netstring import decode_netstring_fd
-from disco import util
+from disco import util, comm
 
 if __name__ == &quot;__main__&quot;:
         if len(sys.argv) &lt; 7:
@@ -28,12 +23,19 @@ if __name__ == &quot;__main__&quot;:
         master_url = sys.argv[4]
 
         ensure_path(OOB_FILE % &quot;&quot;, False)
-
+        
         try:
-                url = &quot;%s/params&quot; % master_url
-                external.ensure_file(PARAMS_FILE, url = url, mode = 444)
+                if util.resultfs_enabled:
+                        x, x, root = util.load_conf()
+                        url = &quot;%s/data/%s/params&quot; % (root, &quot;/&quot;.join(\
+                                master_url.strip(&quot;/&quot;).split(&quot;/&quot;)[-3:]))
+                        fn = lambda: file(url).read()
+                else:
+                        url = &quot;%s/params&quot; % master_url
+                        fn = lambda: comm.download(url)
+                ensure_file(PARAMS_FILE, fn, mode = 444)
         except Exception, e:
-                util.data_err(&quot;Failed to get %s&quot; % url, master_url)
+                util.data_err(&quot;Failed to get %s: %s&quot; % (url, e), master_url)
 
         try:
                 m = decode_netstring_fd(file(PARAMS_FILE))</diff>
      <filename>node/disco-worker</filename>
    </modified>
    <modified>
      <diff>@@ -1,15 +1,12 @@
 import os, subprocess, cStringIO, marshal, time, sys, cPickle, md5
 import re, traceback, tempfile, struct, random
-from disco.util import parse_dir, load_conf, err, data_err, msg
+from disco.util import\
+    parse_dir, load_conf, err, data_err, msg, resultfs_enabled, load_oob
 from disco.func import re_reader, netstr_reader
 from disco.netstring import *
 from disconode.util import *
 from disconode import external
-
-try:
-        import disco.comm_curl as comm
-except:
-        import disco.comm_httplib as comm
+from disco import comm
 
 oob_chars = re.compile(&quot;[^a-zA-Z_\-:0-9]&quot;)
 
@@ -60,30 +57,36 @@ def this_inputs():
         return sys.argv[6:]
 
 def init():
-        global HTTP_PORT, LOCAL_PATH, PARAMS_FILE, EXT_MAP, EXT_REDUCE,\
-               PART_SUFFIX, MAP_OUTPUT, CHUNK_OUTPUT, REDUCE_DL,\
-               REDUCE_SORTED, REDUCE_OUTPUT, OOB_FILE, OOB_URL,\
-               JOB_HOME
+        global HTTP_PORT, PARAMS_FILE, EXT_MAP, EXT_REDUCE,\
+               PART_SUFFIX, MAP_OUTPUT, REDUCE_DL,\
+               REDUCE_SORTED, REDUCE_OUTPUT, OOB_FILE,\
+               JOB_HOME, DISCO_ROOT, JOB_ROOT, PART_OUTPUT,\
+               MAP_INDEX, REDUCE_INDEX
 
-        tmp, HTTP_PORT, LOCAL_PATH = load_conf()
+        tmp, HTTP_PORT, DISCO_ROOT = load_conf()
         job_name = this_name()
 
         JOB_HOME = &quot;%s/%s/%s/&quot; %\
                 (this_host(), md5.md5(job_name).hexdigest()[:2], job_name)
-        pp = LOCAL_PATH + &quot;/&quot; + JOB_HOME
+        JOB_ROOT = &quot;%s/data/%s&quot; % (DISCO_ROOT, JOB_HOME)
+
+        if resultfs_enabled:
+                pp = &quot;%s/temp/%s&quot; % (DISCO_ROOT, JOB_HOME)
+        else:
+                pp = JOB_ROOT
 
-        OOB_URL = (&quot;http://%s/disco/ctrl/oob_get?&quot; % this_master())\
-                        + &quot;name=%s&amp;key=%s&quot;
-        PARAMS_FILE = pp + &quot;params&quot;
-        EXT_MAP = pp + &quot;ext-map&quot;
-        EXT_REDUCE = pp + &quot;ext-reduce&quot;
+        PARAMS_FILE = pp + &quot;params.dl&quot;
+        EXT_MAP = pp + &quot;ext.map&quot;
+        EXT_REDUCE = pp + &quot;ext.reduce&quot;
         PART_SUFFIX = &quot;-%.9d&quot;
         MAP_OUTPUT = pp + &quot;map-disco-%d&quot; + PART_SUFFIX
-        CHUNK_OUTPUT = pp + &quot;map-chunk-%d&quot;
+        PART_OUTPUT = pp + &quot;part-disco-%.9d&quot;
         REDUCE_DL = pp + &quot;reduce-in-%d.dl&quot;
         REDUCE_SORTED = pp + &quot;reduce-in-%d.sorted&quot;
         REDUCE_OUTPUT = pp + &quot;reduce-disco-%d&quot;
         OOB_FILE = pp + &quot;oob/%s&quot;
+        MAP_INDEX = pp + &quot;map-index.txt&quot;
+        REDUCE_INDEX = pp + &quot;reduce-index.txt&quot;
 
 def put(key, value):
         if oob_chars.match(key):
@@ -96,45 +99,33 @@ def put(key, value):
 
 def get(key, job = None):
         try:
-                if job:
-                        url = OOB_URL % (job, key)
-                else:
-                        url = OOB_URL % (this_name(), key)
-                return comm.download(url, redir = True)
+                job = job or this_name()
+                return load_oob(&quot;http://&quot; + this_master(), job, key)
         except comm.CommException, x:
                 data_err(&quot;OOB key (%s) not found at %s: HTTP status '%s'&quot; %\
                         (key, url, x.http_code), key)
 
-def open_local(input, fname, is_chunk):
+def open_local(input, fname):
         try:
                 f = file(fname)
-                if is_chunk:
-                        f.seek(this_partition() * 8)
-                        start, end = struct.unpack(&quot;QQ&quot;, f.read(16))
-                        sze = end - start
-                        f.seek(start)
-                else:
-                        sze = os.stat(fname).st_size
+                sze = os.stat(fname).st_size
                 return sze, f
         except:
                 data_err(&quot;Can't access a local input file (%s): %s&quot;\
                                 % (input, fname), input)
 
-def open_remote(input, ext_host, ext_file, is_chunk):
+def open_remote(input, ext_host, ext_file):
         try:
-                return comm.open_remote(&quot;http://%s%s&quot; % (ext_host, ext_file),
-                        this_partition(), is_chunk)
+                return comm.open_remote(&quot;http://%s%s&quot; % (ext_host, ext_file))
         except Exception, x:
                 data_err(&quot;Can't access an external input file (%s/%s): %s&quot;\
                                 % (ext_host, ext_file, x), x)
 
 def connect_input(input):
-
-        is_chunk = input.startswith(&quot;chunk://&quot;)
-
-        if input.startswith(&quot;disco://&quot;) or is_chunk:
+        
+        if input.startswith(&quot;disco://&quot;):
                 host, fname = input[8:].split(&quot;/&quot;, 1)
-                local_file = LOCAL_PATH + fname
+                local_file = &quot;%s/data/%s&quot; % (DISCO_ROOT, fname)
                 ext_host = &quot;%s:%s&quot; % (host, HTTP_PORT)
                 ext_file = &quot;/&quot; + fname
 
@@ -146,24 +137,20 @@ def connect_input(input):
 
         elif input.startswith(&quot;raw://&quot;):
                 return len(input) - 6, cStringIO.StringIO(input[6:])
-
         else:
                 host = this_host()
-                if input.startswith(&quot;chunkfile://&quot;):
-                        is_chunk = True
-                        local_file = LOCAL_PATH + input[12:]
-                elif input.startswith(&quot;dfs://&quot;):
+                if input.startswith(&quot;dfs://&quot;):
                         t, path = input[6:].split(&quot;/&quot;, 1)
-                        local_file = &quot;%s/input/%s&quot; % (LOCAL_PATH[:-6], path)
+                        local_file = &quot;%s/input/%s&quot; % (DISCO_ROOT, path)
                 elif input.startswith(&quot;file://&quot;):
                         local_file = input[7:]
                 else:
                         local_file = input
 
-        if host == this_host() and local_file:
-                return open_local(input, local_file, is_chunk)
+        if local_file and (resultfs_enabled or host == this_host()):
+                return open_local(input, local_file)
         else:
-                return open_remote(input, ext_host, ext_file, is_chunk)
+                return open_remote(input, ext_host, ext_file)
 
 class MapOutput:
         def __init__(self, part, params, combiner = None):
@@ -227,9 +214,12 @@ class ReduceReader:
                 part = PART_SUFFIX % this_partition()
                 for input in input_files:
                         if input.startswith(&quot;dir://&quot;):
-                                self.inputs += [x for x in parse_dir(input)\
-                                        if x.startswith(&quot;chunk://&quot;) or\
-                                           x.endswith(part)]
+                                try:
+                                        self.inputs += parse_dir(input,
+                                                part_id = this_partition())
+                                except:
+                                        data_err(&quot;Couldn't resolved address %s&quot;\
+                                                % input, input)
                         else:
                                 self.inputs.append(input)
 
@@ -336,23 +326,9 @@ def run_map(job_input, partitions, param):
 
         msg(&quot;Done: %d entries mapped in total&quot; % i)
 
-def merge_chunks(partitions):
-        mapout = CHUNK_OUTPUT % this_partition()
-     
-        f = file(mapout + &quot;.partial&quot;, &quot;w&quot;)
-        offset = (len(partitions) + 1) * 8
-        for p in partitions:
-                f.write(struct.pack(&quot;Q&quot;, offset))
-                offset += os.stat(p.fname).st_size
-        f.write(struct.pack(&quot;Q&quot;, offset))
-        f.close()
-
-        if subprocess.call(&quot;cat %s &gt;&gt; %s.partial&quot; % 
-                        (&quot; &quot;.join([p.fname for p in partitions]),
-                                mapout), shell = True):
-                data_err(&quot;Couldn't create a chunk&quot;, mapout)
-        os.rename(mapout + &quot;.partial&quot;, mapout)
-        for p in partitions:
+def merge_partitions(partitions):
+        for i, p in enumerate(partitions):
+                safe_append(file(p.fname), PART_OUTPUT % i)
                 os.remove(p.fname)
 
 def import_modules(modules, funcs):
@@ -370,6 +346,7 @@ def op_map(job):
                         &quot; &quot;.join(job_input))
 
         nr_reduces = int(job['nr_reduces'])
+        nr_part = max(1, nr_reduces)
         fun_map_reader.func_code = marshal.loads(job['map_reader'])
         fun_map_writer.func_code = marshal.loads(job['map_writer'])
         fun_partition.func_code = marshal.loads(job['partition'])
@@ -396,18 +373,27 @@ def op_map(job):
         if 'combiner' in job:
                 fun_combiner.func_code = marshal.loads(job['combiner'])
                 partitions = [MapOutput(i, map_params, fun_combiner)\
-                        for i in range(nr_reduces)]
+                        for i in range(nr_part)]
         else:
-                partitions = [MapOutput(i, map_params) for i in range(nr_reduces)]
+                partitions = [MapOutput(i, map_params) for i in range(nr_part)]
         
         run_map(job_input[0], partitions, map_params)
+        external.close_ext()
+        
         for p in partitions:
                 p.close()
-        if 'chunked' in job:
-                merge_chunks(partitions)
-        
-        external.close_ext()
-        msg(&quot;dir://%s/map/%s&quot; % (this_host(), JOB_HOME), &quot;OUT&quot;)
+
+        if nr_reduces:
+                merge_partitions(partitions)
+                n = os.path.basename(PART_OUTPUT % 0)
+                msg(&quot;dir://%s/%s%s:%d&quot; % (this_host(), JOB_HOME, n,
+                        len(partitions) - 1), &quot;OUT&quot;)
+        else:
+                res = [os.path.basename(p.fname) for p in partitions]
+                index = cStringIO.StringIO(&quot;\n&quot;.join(res) + &quot;\n&quot;)
+                safe_append(index, MAP_INDEX)
+                msg(&quot;dir://%s/%smap-index.txt&quot; %\
+                        (this_host(), JOB_HOME), &quot;OUT&quot;)
 
 def op_reduce(job):
         job_inputs = this_inputs()
@@ -447,7 +433,9 @@ def op_reduce(job):
         
         red_out.close()
         external.close_ext()
-
-        msg(&quot;dir://%s/reduce/%s&quot; % (this_host(), JOB_HOME), &quot;OUT&quot;)
+        
+        index = cStringIO.StringIO(os.path.basename(red_out.fname) + &quot;\n&quot;)
+        safe_append(index, REDUCE_INDEX)
+        msg(&quot;dir://%s/%sreduce-index.txt&quot; % (this_host(), JOB_HOME), &quot;OUT&quot;)
 
 init()</diff>
      <filename>node/disconode/disco_worker.py</filename>
    </modified>
    <modified>
      <diff>@@ -2,11 +2,7 @@ import os, os.path, time, struct, marshal
 from subprocess import *
 from disco.netstring import decode_netstring_str
 from disconode.util import *
-
-try:
-        import disco.comm_curl as comm
-except:
-        import disco.comm_httplib as comm
+from disco.util import msg
 
 MAX_ITEM_SIZE = 1024**3
 MAX_NUM_OUTPUT = 1000000
@@ -79,6 +75,7 @@ def prepare(ext_job, params, path):
         open_ext(path + &quot;/op&quot;, params)
 
 def open_ext(fname, params):
+        # XXX! Run external programs in /data/ dir, not /temp/
         global proc, in_fd, out_fd
         proc = Popen([fname], stdin = PIPE, stdout = PIPE)
         in_fd = proc.stdin
@@ -94,28 +91,4 @@ def write_files(ext_data, path):
         for fname, data in ext_data.iteritems():
                 ensure_file(path + &quot;/&quot; + fname, data = data)
 
-def ensure_file(fname, data = None, url = None, timeout = 60, mode = 500):
-        while timeout &gt; 0:
-                if os.path.exists(fname):
-                        return
-                try:
-                        fd = os.open(fname + &quot;.partial&quot;,
-                                os.O_CREAT | os.O_EXCL | os.O_WRONLY, 500)
-                        if data:
-                                os.write(fd, data)
-                        else:
-                                os.write(fd, comm.download(url))
-                        os.close(fd)
-                        os.rename(fname + &quot;.partial&quot;, fname)
-                        return
-                except OSError, x:
-                        # File exists
-                        if x.errno == 17:
-                                time.sleep(1)
-                                timeout -= 1
-                        else:
-                                msg(&quot;Writing external file %s failed&quot; % fname)
-                                raise
-        msg(&quot;Timeout in writing external file %s&quot; % fname)
-        raise Exception(&quot;Timeout in writing external file %s&quot; % fname)
         </diff>
      <filename>node/disconode/external.py</filename>
    </modified>
    <modified>
      <diff>@@ -1,6 +1,5 @@
-import sys, time, os, traceback
-
-job_name = &quot;none&quot;
+import sys, time, os, traceback, fcntl
+from disco.util import msg, data_err, err
 
 def ensure_path(path, check_exists = True):
         if check_exists and os.path.exists(path):
@@ -18,3 +17,68 @@ def ensure_path(path, check_exists = True):
                         pass
                 else:
                         raise x
+
+# About concurrent append operations: 
+#
+# Posix spec says:
+#
+# If the O_APPEND flag of the file status flags is set, the file
+# offset shall be set to the end of the file prior to each write and no
+# intervening file modification operation shall occur between changing the
+# file offset and the write operation.
+#
+# See also 
+# http://www.perlmonks.org/?node_id=486488
+#
+def safe_append(instream, outfile, timeout = 60):
+        outstream = file(outfile, &quot;a&quot;)
+        while timeout &gt; 0:
+                try:
+                        fcntl.flock(outstream, fcntl.LOCK_EX | fcntl.LOCK_NB)
+                        try:
+                                while True:
+                                        buf = instream.read(8192)
+                                        if not buf:
+                                                instream.close()
+                                                outstream.close()
+                                                return
+                                        outstream.write(buf)
+                        except Exception, x:
+                                # output file is inconsistent state
+                                # we must crash the job
+                                err(&quot;Updating file %s failed: %s&quot; %\
+                                        (outfile, x))
+                except IOError, x:
+                        # Python doc guides us to check both the
+                        # EWOULDBLOCK (11) and EACCES (13) errors
+                        if x.errno == 11 or x.errno == 13:
+                                time.sleep(1)
+                                timeout -= 1
+                        else:
+                                raise
+        data_err(&quot;Timeout when updating file %s&quot; % outfile, outfile)
+
+
+def ensure_file(fname, data = None, timeout = 60, mode = 500):
+        while timeout &gt; 0:
+                if os.path.exists(fname):
+                        return False
+                try:
+                        fd = os.open(fname + &quot;.partial&quot;,
+                                os.O_CREAT | os.O_EXCL | os.O_WRONLY, mode)
+                        if type(data) == str:
+                               os.write(fd, data)
+                        else:
+                               os.write(fd, data())
+                        os.close(fd)
+                        os.rename(fname + &quot;.partial&quot;, fname)
+                        return True
+                except OSError, x:
+                        # File exists
+                        if x.errno == 17:
+                                time.sleep(1)
+                                timeout -= 1
+                        else:
+                                data_err(&quot;Writing external file %s failed&quot;\
+                                        % fname, fname)
+        data_err(&quot;Timeout in writing external file %s&quot; % fname, fname)</diff>
      <filename>node/disconode/util.py</filename>
    </modified>
    <modified>
      <diff>@@ -2,9 +2,6 @@ import cStringIO, struct, time, sys, os
 from pycurl import *
 from disco.comm_httplib import CommException
 
-if &quot;nocurl&quot; in os.environ.get(&quot;DISCO_FLAGS&quot;, &quot;&quot;).lower().split():
-        raise Exception(&quot;nocurl&quot;)
-
 MAX_BUF = 1024**2
 MAX_RETRIES = 10
 dl_handle = None
@@ -45,14 +42,14 @@ def download(url, data = None, redir = False):
         return outbuf.getvalue()
 
 class CurlConn:
-        def __init__(self, url, start = None, end = None, handle = None):
+        def __init__(self, url, handle = None, expect = 200):
                 if handle:
                         self.handle = handle
                 else:
                         self.handle = Curl()
 
                 for i in range(MAX_RETRIES):
-                        self.init_handle(url, start, end)
+                        self.init_handle(url)
                         self.perform()
                         x, succ, fail = self.multi.info_read(1) 
                         if not fail:
@@ -71,18 +68,12 @@ class CurlConn:
                 code = self.handle.getinfo(HTTP_CODE)
                 if code == 0:
                         raise CommException(&quot;Couldn't receive http response&quot;)
-
-                if start == None:
-                        check_code(self.handle, 200)
-                else:
-                        check_code(self.handle, 206)
+                check_code(self.handle, expect)
                 
-        def init_handle(self, url, start, end):
+        def init_handle(self, url):
                 self.handle.setopt(URL, url)
                 self.handle.setopt(WRITEFUNCTION, self.write)
                 self.handle.setopt(HEADERFUNCTION, self.head)
-                if start != None:
-                        self.handle.setopt(RANGE, &quot;%d-%d&quot; % (start, end - 1))
                 self.multi = CurlMulti()
                 self.multi.add_handle(self.handle)
                 self.buf = &quot;&quot;
@@ -102,8 +93,15 @@ class CurlConn:
                         self.cont = num_handles
         
         def head(self, buf):
-                if buf.lower().startswith(&quot;content-length:&quot;):
+                buf = buf.lower()
+                if buf.startswith(&quot;content-length:&quot;):
                         self.length = int(buf.split(&quot;:&quot;)[1])
+                elif buf.startswith(&quot;location:&quot;):
+                        self.location = buf.split(&quot;:&quot;, 1)[1].strip()
+
+        def getheader(self, x):
+                if x == &quot;location&quot;:
+                        return self.location
 
         def write(self, buf):
                 self.body = True
@@ -119,23 +117,9 @@ class CurlConn:
         def disco_stats(self):
                 pass
 
-def open_remote(url, part = None, is_chunk = None):
+def open_remote(url, expect = 200):
         c = Curl()
-        if is_chunk:
-                pos = part * 8
-                buf = cStringIO.StringIO()
-                c.setopt(URL, url)
-                c.setopt(RANGE, &quot;%d-%d&quot; % (pos, pos + 15))
-                c.setopt(WRITEFUNCTION, buf.write)
-                c.perform()
-                check_code(c, 206)
-                start, end = struct.unpack(&quot;QQ&quot;, buf.getvalue())
-                if start == end:
-                        return 0, cStringIO.StringIO()        
-        else:
-                start = end = None
-        
-        conn = CurlConn(url, start, end, handle = c)
+        conn = CurlConn(url, handle = c, expect = expect)
         return conn.length, conn
 
 </diff>
      <filename>pydisco/disco/comm_curl.py</filename>
    </modified>
    <modified>
      <diff>@@ -33,7 +33,7 @@ def check_code(fd, expected):
         if fd.status != expected:
                 raise CommException(fd.status)
 
-def open_remote(url, part = None, is_chunk = False, data = None):
+def open_remote(url, data = None, expect = 200):
         try:
                 ext_host, ext_file = url[7:].split(&quot;/&quot;, 1)
                 ext_file = &quot;/&quot; + ext_file
@@ -51,30 +51,14 @@ def open_remote(url, part = None, is_chunk = False, data = None):
                         http = httplib.HTTPConnection(ext_host)
                         http_pool[ext_host] = http
 
-                if is_chunk:
-                        pos = part * 8
-                        rge = &quot;bytes=%d-%d&quot; % (pos, pos + 15)
-                        http.request(&quot;GET&quot;, ext_file, None, {&quot;Range&quot;: rge})
-                        fd = http.getresponse()
-                        check_code(fd, 206)
-                        
-                        start, end = struct.unpack(&quot;QQ&quot;, fd.read())
-                        if start == end:
-                                return 0, cStringIO.StringIO()
-                        else:
-                                rge = &quot;bytes=%d-%d&quot; % (start, end - 1)
-                        
-                        http.request(&quot;GET&quot;, ext_file, None, {&quot;Range&quot;: rge})
-                        fd = http.getresponse()
-                        check_code(fd, 206)
-                elif data:
+                if data:
                         http.request(&quot;POST&quot;, ext_file, data)
                         fd = http.getresponse()
-                        check_code(fd, 200)
+                        check_code(fd, expect)
                 else:
                         http.request(&quot;GET&quot;, ext_file, None)
                         fd = http.getresponse()
-                        check_code(fd, 200)
+                        check_code(fd, expect)
                
                 sze = fd.getheader(&quot;content-length&quot;)
                 if sze:
@@ -88,4 +72,4 @@ def open_remote(url, part = None, is_chunk = False, data = None):
                 # endless recursion if something went seriously wrong.
                 http.close()
                 del http_pool[ext_host]
-                return open_remote(url, part, is_chunk)
+                return open_remote(url, data)</diff>
      <filename>pydisco/disco/comm_httplib.py</filename>
    </modified>
    <modified>
      <diff>@@ -1,12 +1,7 @@
 import sys, re, os, marshal, cjson, time, cPickle
-from disco import func, util
+from disco import func, util, comm
 from netstring import *
 
-try:
-        import disco.comm_curl as comm
-except:
-        import disco.comm_httplib as comm
-
 class JobException(Exception):
         def __init__(self, msg, master, name):
                 self.msg = msg
@@ -73,13 +68,11 @@ class Disco(object):
         
         def oob_get(self, name, key):
                 try:
-                        r = self.request(&quot;/disco/ctrl/oob_get?name=%s&amp;key=%s&quot; %\
-                                (name, key), redir = True)
+                        return util.load_oob(self.host, name, key)
                 except comm.CommException, x:
                         if x.http_code == 404:
                                 raise KeyError(&quot;Unknown key or job name&quot;)
                         raise
-                return r
 
         def oob_list(self, name):
                 try:
@@ -197,7 +190,6 @@ class Job(object):
                     &quot;sort&quot;: False,
                     &quot;params&quot;: Params(),
                     &quot;mem_sort_limit&quot;: 256 * 1024**2,
-                    &quot;chunked&quot;: None,
                     &quot;ext_params&quot;: None,
                     &quot;status_interval&quot;: 100000,
                     &quot;required_modules&quot;: [],
@@ -234,6 +226,9 @@ class Job(object):
                 
                 if &quot;input_files&quot; in kw:
                         kw[&quot;input&quot;] = kw[&quot;input_files&quot;]
+
+                if &quot;chunked&quot; in kw:
+                        raise Exception(&quot;Argument 'chunked' is deprecated&quot;)
                 
                 if not &quot;input&quot; in kw:
                         raise Exception(&quot;input is required&quot;)
@@ -289,24 +284,46 @@ class Job(object):
                                         parsed_inputs.append(inp)
                         inputs = parsed_inputs
                 else:
-                        addr = []
+                        nr_maps = 0
+                        ext_inputs = []
+                        red_inputs = []
                         for inp in inputs:
                                 if type(inp) == list:
                                         raise Exception(&quot;Reduce doesn't &quot;\
                                                 &quot;accept redundant inputs&quot;)
-                                elif not inp.startswith(&quot;dir://&quot;):
-                                        addr.append(inp)
-
-                        if d(&quot;nr_reduces&quot;) == None and not addr:
-                                raise Exception(&quot;nr_reduces must match to &quot;\
-                                        &quot;the number of partitions in the &quot;\
-                                        &quot;input data&quot;)
-
-                        if d(&quot;nr_reduces&quot;) != 1 and addr: 
+                                elif inp.startswith(&quot;dir://&quot;):
+                                        if inp.endswith(&quot;.txt&quot;):
+                                                ext_inputs.append(inp)
+                                        else:
+                                                red_inputs.append(inp)
+                                else:
+                                        ext_inputs.append(inp)
+
+                        if ext_inputs and red_inputs:
+                                raise Exception(&quot;Can't mix partitioned &quot;\
+                                        &quot;inputs with other inputs&quot;)
+                        elif red_inputs:
+                                q = lambda x: int(x.split(&quot;:&quot;)[-1]) + 1
+                                nr_red = q(red_inputs[0])
+                                for x in red_inputs:
+                                        if q(x) != nr_red:
+                                                raise Exception(\
+                                                &quot;Number of partitions must &quot;\
+                                                &quot;match in all inputs&quot;)
+                                n = d(&quot;nr_reduces&quot;) or nr_red
+                                if n != nr_red:
+                                        raise Exception(
+                                        &quot;Specified nr_reduces = %d but &quot;\
+                                        &quot;number of partitions in the input &quot;\
+                                        &quot;is %d&quot; % (n, nr_red))
+                                kw[&quot;nr_reduces&quot;] = nr_red
+                                inputs = red_inputs
+                        elif d(&quot;nr_reduces&quot;) != 1:
                                 raise Exception(&quot;nr_reduces must be 1 when &quot;\
-                                        &quot;using external inputs without &quot;\
-                                        &quot;the map phase&quot;)
-                        nr_maps = 0
+                                        &quot;using non-partitioned inputs &quot;\
+                                        &quot;without the map phase&quot;)
+                        else:
+                                inputs = ext_inputs
                
                 req[&quot;input&quot;] = &quot; &quot;.join(inputs)
                 req[&quot;nr_maps&quot;] = str(nr_maps)
@@ -327,7 +344,6 @@ class Job(object):
                                 req[&quot;reduce&quot;] = marshal.dumps(
                                         kw[&quot;reduce&quot;].func_code)
                         nr_reduces = nr_reduces or min(max(nr_maps / 2, 1), 100)
-                        req[&quot;chunked&quot;] = &quot;True&quot;
                        
                         req[&quot;reduce_reader&quot;] =\
                                 marshal.dumps(d(&quot;reduce_reader&quot;).func_code)
@@ -338,16 +354,10 @@ class Job(object):
                                 req[&quot;reduce_init&quot;] = marshal.dumps(\
                                         kw[&quot;reduce_init&quot;].func_code)
                 else:
-                        nr_reduces = nr_reduces or 1
-                
+                        nr_reduces = nr_reduces or 0
+               
                 req[&quot;nr_reduces&quot;] = str(nr_reduces)
 
-                if d(&quot;chunked&quot;) != None:
-                        if d(&quot;chunked&quot;):
-                                req[&quot;chunked&quot;] = &quot;True&quot;
-                        elif &quot;chunked&quot; in req:
-                                del req[&quot;chunked&quot;]
-
                 if &quot;combiner&quot; in kw:
                         req[&quot;combiner&quot;] =\
                                 marshal.dumps(kw[&quot;combiner&quot;].func_code)
@@ -377,23 +387,33 @@ def result_iterator(results, notifier = None,\
                 else:
                         res.append(dir_url)
         
+        x, x, root = util.load_conf()
+
         for url in res:
                 if url.startswith(&quot;file://&quot;):
                         fname = url[7:]
                         fd = file(fname)
                         sze = os.stat(fname).st_size
-                else:
+                elif url.startswith(&quot;disco://&quot;):
                         host, fname = url[8:].split(&quot;/&quot;, 1)
                         if proxy:
                                 ext_host = proxy
                                 fname = &quot;/disco/node/%s/%s&quot; % (host, fname)
                         else:
                                 ext_host = host + &quot;:&quot; + util.HTTP_PORT
-                        sze, fd = comm.open_remote(&quot;http://%s/%s&quot; % (ext_host, fname))
+                        if util.resultfs_enabled:
+                                f = &quot;%s/data/%s&quot; % (root, fname)
+                                fd = file(f)
+                                sze = os.stat(f).st_size
+                        else:
+                                sze, fd = comm.open_remote(&quot;http://%s/%s&quot;\
+                                        % (ext_host, fname))
+                else:
+                        raise JobException(&quot;Invalid result url: %s&quot; % url)
 
                 if notifier:
                         notifier(url)
 
-                for x in reader(fd, fd.length, fname):
+                for x in reader(fd, sze, fname):
                         yield x
                 </diff>
      <filename>pydisco/disco/core.py</filename>
    </modified>
    <modified>
      <diff>@@ -1,13 +1,11 @@
 
 import re, os
 import sys, time, os, traceback
-
-try:
-        import disco.comm_curl as comm
-except:
-        import disco.comm_httplib as comm
+from disco import comm
 
 job_name = &quot;none&quot;
+resultfs_enabled =\
+        &quot;resultfs&quot; in os.environ.get(&quot;DISCO_FLAGS&quot;, &quot;&quot;).lower().split()
 
 def msg(m, c = 'MSG', job_input = &quot;&quot;):
         t = time.strftime(&quot;%y/%m/%d %H:%M:%S&quot;)
@@ -35,14 +33,14 @@ def load_conf():
         
         return os.environ.get(&quot;DISCO_MASTER_PORT&quot;, master.strip()),\
                os.environ.get(&quot;DISCO_PORT&quot;, port.strip()),\
-               os.environ.get(&quot;DISCO_ROOT&quot;, root.strip()) + &quot;/data/&quot;
+               os.environ.get(&quot;DISCO_ROOT&quot;, root.strip())
 
 
 def jobname(addr):
         if addr.startswith(&quot;disco:&quot;) or addr.startswith(&quot;http:&quot;):
                 return addr.strip(&quot;/&quot;).split(&quot;/&quot;)[-2]
         elif addr.startswith(&quot;dir:&quot;):
-                return addr.strip(&quot;/&quot;).split(&quot;/&quot;)[-1]
+                return addr.strip(&quot;/&quot;).split(&quot;/&quot;)[-2]
         else:
                 raise &quot;Unknown address: %s&quot; % addr
 
@@ -70,15 +68,45 @@ def disco_host(addr):
                 raise &quot;Unknown host specifier: %s&quot; % addr
 
 
-def parse_dir(dir_url, proxy = None):
-        x, x, host, mode, name = dir_url.split(&quot;/&quot;, 4)
+def parse_dir(dir_url, proxy = None, part_id = None):
+        x, x, host, name = dir_url.split(&quot;/&quot;, 3)
         if proxy:
-                url = &quot;http://%s/disco/node/%s/%s/&quot; % (proxy, host, name)
+                url = &quot;http://%s/disco/node/%s/%s&quot; % (proxy, host, name)
+        else:
+                url = &quot;http://%s:%s/%s&quot; % (host, HTTP_PORT, name)
+
+        if name.endswith(&quot;.txt&quot;):
+                if resultfs_enabled:
+                        r = file(&quot;%s/data/%s&quot; % (ROOT, name)).readlines()
+                else:
+                        r = comm.download(url).splitlines()
+        else:
+                b, max = name.split(&quot;/&quot;)[-1].split(&quot;:&quot;)
+                fl = len(max)
+                base = b[:len(b) - fl]
+                t = &quot;%s%%.%dd&quot; % (base, fl)
+                if part_id != None:
+                        r = [t % part_id]
+                else:
+                        r = [t % i for i in range(int(max) + 1)]
+
+        p = &quot;/&quot;.join(name.split(&quot;/&quot;)[:-1])
+        return [&quot;disco://%s/%s/%s&quot; % (host, p, x.strip()) for x in r]
+
+def load_oob(host, name, key):
+        url = &quot;%s/disco/ctrl/oob_get?name=%s&amp;key=%s&quot; % (host, name, key)
+        if resultfs_enabled:
+                sze, fd = comm.open_remote(url, expect = 302)
+                loc = fd.getheader(&quot;location&quot;)
+                fname = &quot;%s/data/%s&quot; % (ROOT, &quot;/&quot;.join(loc.split(&quot;/&quot;)[3:]))
+                try:
+                        return file(fname).read()
+                except Exception, x:
+                        raise comm.CommException(404, 
+                                &quot;OOB key (%s) not found at %s&quot; %\
+                                (key, fname))
         else:
-                url = &quot;http://%s:%s/%s/&quot; % (host, HTTP_PORT, name)
-        html = comm.download(url)
-        inputs = re.findall(&quot;&gt;(%s-(.+?)-.*?)&lt;/a&gt;&quot; % mode, html)
-        return [&quot;%s://%s/%s/%s&quot; % (prefix, host, name, x)\
-                        for x, prefix in inputs if &quot;.&quot; not in x]
+                return comm.download(url, redir = True)
+
 
-MASTER_PORT, HTTP_PORT, tmp = load_conf()
+MASTER_PORT, HTTP_PORT, ROOT = load_conf()</diff>
      <filename>pydisco/disco/util.py</filename>
    </modified>
    <modified>
      <diff>@@ -30,16 +30,17 @@ DEFAULT_NUFA_PORT = 9800
 def check_config(config, replicas = True):
         REQ = [&quot;nodes&quot;, &quot;volumes&quot;, &quot;master&quot;, &quot;config_dir&quot;]
         if replicas:
-            REQ.append(&quot;replicas&quot;)
+                REQ.append(&quot;replicas&quot;)
+                if config[&quot;replicas&quot;] &gt; len(config[&quot;nodes&quot;]):
+                        print &quot;replicas must be less than equal&quot;\
+                              &quot;to the number of nodes.&quot;
+                        print &quot;Check the config file.&quot;
+                        sys.exit(1)
         for k in REQ:
                 if k not in config:
                         print &quot;Required field '%s' is missing.&quot; % k
                         print &quot;Check the config file.&quot;
                         sys.exit(1)
-        if config[&quot;replicas&quot;] &gt; len(config[&quot;nodes&quot;]):
-                print &quot;replicas must be less than equal to the number of nodes.&quot;
-                print &quot;Check the config file.&quot;
-                sys.exit(1)
 
 def output_volume(f, name, type, subvol = None, options = {}):
         print &gt;&gt; f, &quot;volume %s&quot; % name
@@ -54,7 +55,8 @@ def output_volume(f, name, type, subvol = None, options = {}):
 def nodes_sect(f, config):
         print &gt;&gt; f, &quot;\n# -----\n# NODES\n# -----\n&quot;
         opt = {&quot;transport-type&quot;: &quot;tcp&quot;,
-               &quot;remote-port&quot;: config[&quot;port&quot;]}
+               &quot;remote-port&quot;: config[&quot;port&quot;],
+               &quot;ping-timeout&quot;: &quot;60&quot;}
         
         for node in config[&quot;nodes&quot;]:
                 print &gt;&gt; f, &quot;# -- %s\n&quot; % node
@@ -105,12 +107,16 @@ def server_sect(f, config, options, writevol):
                         output_volume(f, &quot;%s-posix&quot; % vol, &quot;storage/posix&quot;,
                                 [], {&quot;directory&quot;: dir})
                         subvol.append(vol)
-                        output_volume(f, &quot;%s-lock&quot; % vol, &quot;features/locks&quot;,
+                        output_volume(f, &quot;%s-lock&quot; % vol,
+                                &quot;features/locks&quot;,
                                 [&quot;%s-posix&quot; % vol])
-                        output_volume(f, vol, &quot;performance/read-ahead&quot;,
+                        output_volume(f, &quot;%s-readahead&quot; % vol,
+                                &quot;performance/read-ahead&quot;,
                                 [&quot;%s-lock&quot; % vol], {
-                                        &quot;page-size&quot;: &quot;512KB&quot;, 
-                                        &quot;page-count&quot;: &quot;4&quot;})
+                                &quot;page-size&quot;: &quot;512KB&quot;, 
+                                &quot;page-count&quot;: &quot;4&quot;})
+                        output_volume(f, vol, &quot;performance/io-threads&quot;,
+                                [&quot;%s-readahead&quot; % vol], {&quot;thread-count&quot;: &quot;32&quot;})
         else:
                 print &gt;&gt; f, &quot;# this is just a dummy volume as gluster&quot;
                 print &gt;&gt; f, &quot;# requires us to specify a subvolume for&quot;
@@ -118,7 +124,7 @@ def server_sect(f, config, options, writevol):
                 print &gt;&gt; f, &quot;# here.&quot;
                 path = os.path.abspath(config[&quot;config_dir&quot;])
                 output_volume(f, &quot;masterdummy&quot;, &quot;storage/posix&quot;,
-                        [], {&quot;directory&quot;: path})
+                        [], {&quot;directory&quot;: &quot;/tmp&quot;})
                 subvol = [&quot;masterdummy&quot;]
 
         print &gt;&gt; f, &quot;# -- server\n&quot; 
@@ -148,6 +154,10 @@ def create_nufa_config(config, path):
         if len(config[&quot;volumes&quot;]) &gt; 1:
             print &quot;Specify only one volume for results&quot;
             sys.exit(1)
+        if config[&quot;master&quot;] not in config[&quot;nodes&quot;]:
+            print &quot;Add your master node '%s' to the list of nodes&quot; %\
+                config[&quot;master&quot;]
+            sys.exit(1)
         
         config[&quot;port&quot;] = config.get(&quot;port&quot;, DEFAULT_NUFA_PORT)
 </diff>
      <filename>util/gluster_config.py</filename>
    </modified>
  </modified>
  <removed type="array">
    <removed>
      <filename>test/test_chunked.py</filename>
    </removed>
  </removed>
  <parents type="array">
    <parent>
      <id>e84effe473b7935d0766c484732c102c70e191c2</id>
    </parent>
  </parents>
  <author>
    <name>Ville Tuulos</name>
    <email>tuulos@nxfront.nokiapaloalto.com</email>
  </author>
  <url>http://github.com/tuulos/disco/commit/2ae75c30afee7a42d1bd73eb66f62334f44b104d</url>
  <id>2ae75c30afee7a42d1bd73eb66f62334f44b104d</id>
  <committed-date>2009-06-16T19:57:51-07:00</committed-date>
  <authored-date>2009-06-11T15:27:13-07:00</authored-date>
  <message>Support for resultfs finalized, new disk layout for intermediate results. Closes #9</message>
  <tree>291a8102874ed5c50589598823505b80ee738874</tree>
  <committer>
    <name>Ville Tuulos</name>
    <email>tuulos@nxfront.nokiapaloalto.com</email>
  </committer>
</commit>
