<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array"/>
  <modified type="array">
    <modified>
      <diff>@@ -4,6 +4,8 @@
 -export([start_link/1, init/1, handle_call/3, handle_cast/2, 
         handle_info/2, terminate/2, code_change/3]).
 
+-include(&quot;task.hrl&quot;).
+
 start_link(Nodes) -&gt;
         error_logger:info_report([{&quot;Fair scheduler starts&quot;}]),
         case gen_server:start_link({local, scheduler},
@@ -17,7 +19,7 @@ init(Nodes) -&gt;
                 {ok, &quot;fifo&quot;} -&gt;
                         error_logger:info_report(
                                 [{&quot;Scheduler uses fifo policy&quot;}]),
-                        fair_scheduler_fifo_policy:start_link(Nodes)
+                        fair_scheduler_fifo_policy:start_link(Nodes);
                 _ -&gt;
                         error_logger:info_report(
                                 [{&quot;Scheduler uses fair policy&quot;}]),
@@ -33,7 +35,7 @@ handle_cast({update_nodes, NewNodes}, _) -&gt;
 
 handle_cast({new_task, Task}, Nodes) -&gt;
         JobName = Task#task.jobname,
-        JobPid = case ets:lookup(jobs, JobName) of
+        Job = case ets:lookup(jobs, JobName) of
                 [] -&gt;
                         {ok, JobPid} = fair_scheduler_job:start(
                                 Task#task.jobname, Task#task.from),
@@ -44,7 +46,7 @@ handle_cast({new_task, Task}, Nodes) -&gt;
                         JobPid;
                 [{_, JobPid}] -&gt; JobPid
         end,
-        gen_server:cast(JobPid, {new_task, Task}),
+        gen_server:cast(Job, {new_task, Task}),
         {noreply, Nodes}.
 
 handle_call({next_job, AvailableNodes}, _From, Nodes) -&gt;
@@ -56,13 +58,13 @@ next_job(AvailableNodes, Jobs, NotJobs) -&gt;
                 {ok, JobPid} -&gt; 
                         case fair_scheduler_job:next_task(
                                         JobPid, Jobs, AvailableNodes) of
-                                {ok, {Node, Task} = E} -&gt;
-                                        {ok, E};
+                                {ok, Task} -&gt;
+                                        {ok, Task};
                                 none -&gt;
                                         next_job(AvailableNodes,
                                                 Jobs, [JobPid|NotJobs])
                         end;
-                nojobs -&gt; nojobs;
+                nojobs -&gt; nojobs
         end.
 
 job_cast(Msg) -&gt;</diff>
      <filename>master/src/fair_scheduler.erl</filename>
    </modified>
    <modified>
      <diff>@@ -22,9 +22,9 @@ init(Nodes) -&gt;
         register(fairy, spawn_link(fun() -&gt; fairness_fairy(NumCores) end)),
         {ok, {gb_trees:empty(), [], NumCores}}.
 
-% messages starting with _ are not part of the public policy api
+% messages starting with 'priv' are not part of the public policy api
 
-handle_cast({_update_priorities, Priorities}, _, {Jobs, _, NC}) -&gt;
+handle_cast({priv_update_priorities, Priorities}, {Jobs, _, NC}) -&gt;
         % The Jobs tree may have changed while fairy was working.
         % Update only the elements that fairy knew about.
         NewJobs = lists:foldl(fun({JobPid, NewJob}, NJobs) -&gt;
@@ -42,12 +42,12 @@ handle_cast({_update_priorities, Priorities}, _, {Jobs, _, NC}) -&gt;
 
 % Cluster topology has changed. Inform the fairy about the new total
 % number of cores available.
-handle_cast({update_nodes, Nodes}, _, {Jobs, PrioQ, _}) -&gt;
+handle_cast({update_nodes, Nodes}, {Jobs, PrioQ, _}) -&gt;
         NumCores = lists:sum([C || {_, C} &lt;- Nodes]),
         fairy ! {update, NumCores},
         {noreply, {Jobs, PrioQ, NumCores}};
 
-handle_cast({new_job, JobPid, JobName}, _, {Jobs, PrioQ, NC}) -&gt;
+handle_cast({new_job, JobPid, JobName}, {Jobs, PrioQ, NC}) -&gt;
         InitialPrio = -1.0 / lists:max([gb_trees:size(Jobs), 1.0]),
         
         Job = #job{name = JobName, cputime = 0, prio = InitialPrio,
@@ -62,13 +62,13 @@ handle_call({next_job, _}, _, {{0, _}, _, _} = S) -&gt;
 
 % NotJobs lists all jobs that got 'none' reply from the fair_scheduler_job task
 % scheduler. We want to skip them.
-handle_call({next_job, NotJobs} _, {Jobs, PrioQ, NC}) -&gt;
+handle_call({next_job, NotJobs}, _, {Jobs, PrioQ, NC}) -&gt;
         {NextJob, RPrioQ} = dropwhile(PrioQ, [], NotJobs),
         {UJobs, UPrioQ} = bias_priority(
                 gb_trees:get(NextJob), RPrioQ, Jobs, NC),
         {reply, {ok, NextJob}, {UJobs, UPrioQ, NC}};
 
-handle_call(_get_jobs, _, {Jobs, _, _} = S) -&gt;
+handle_call(priv_get_jobs, _, {Jobs, _, _} = S) -&gt;
         {reply, {ok, Jobs}, S}.
 
 handle_info({'DOWN', _, _, JobPid, _}, {Jobs, PrioQ, NC}) -&gt;
@@ -78,7 +78,7 @@ handle_info({'DOWN', _, _, JobPid, _}, {Jobs, PrioQ, NC}) -&gt;
 dropwhile([JobPid|R], H, NotJobs) -&gt;
         case lists:member(JobPid, NotJobs) of
                 false -&gt; {JobPid, lists:reverse(H) ++ R};
-                true -&gt; dropwhile(R, [JobPid|H], NotJob)
+                true -&gt; dropwhile(R, [JobPid|H], NotJobs)
         end.
 
 % Bias priority is a cheap trick to estimate a new priority for a job that
@@ -95,8 +95,7 @@ bias_priority(Job, PrioQ, Jobs, NumCores) -&gt;
 
 % Insert an item to an already sorted list
 prioq_insert(Item, R) -&gt; prioq_insert(Item, R, []).
-prioq_insert({Prio, _} = Item, [], H) -&gt;
-        lists:reverse([Item|H]);
+prioq_insert(Item, [], H) -&gt; lists:reverse([Item|H]);
 prioq_insert({Prio, _} = Item, [{P, _} = E|R], H) when Prio &gt; P -&gt;
         prioq_insert(Item, R, [E|H]);
 prioq_insert(Item, L, H) -&gt;
@@ -114,11 +113,11 @@ fairness_fairy(NumCores) -&gt;
                 {ok, Alpha} = application:get_env(fair_scheduler_alpha),
                 update_priorities(Alpha, NumCores),
                 fairness_fairy(NumCores)
-        end
+        end.
 
 update_priorities(_, 0) -&gt; ok;
 update_priorities(Alpha, NumCores) -&gt;
-        {ok, Jobs} = gen_server:call(sched_policy, _get_jobs),
+        {ok, Jobs} = gen_server:call(sched_policy, priv_get_jobs),
         NumJobs = gb_trees:size(Jobs),
 
         % Get the status of each running job
@@ -135,7 +134,7 @@ update_priorities(Alpha, NumCores) -&gt;
         % Extra resources are shared equally among the needy 
         ExtraShare = lists:sum(Extra) / (NumJobs - length(Extra)),
 
-        gen_server:cast(sched_policy, {_update_priorities, lists:map(fun
+        gen_server:cast(sched_policy, {priv_update_priorities, lists:map(fun
                 ({Job, {NumTasks, NumRunning}}) -&gt;
                         MyShare = 
                                 if NumTasks &lt; Share -&gt;
@@ -151,18 +150,9 @@ update_priorities(Alpha, NumCores) -&gt;
                         Prio = Alpha * Deficit + (1 - Alpha) * Job#job.prio,
                         {Job#job.pid, Job#job{prio = Prio, bias = 0,
                                 cputime = Job#job.cputime + NumRunning}}
-        end, Stats)}),
-
-                        
-
-                
-
-
-
+        end, Stats)}).
 
+% callback stubs
+terminate(_Reason, _State) -&gt; {}.
 
-
-
-
-
-        
+code_change(_OldVsn, State, _Extra) -&gt; {ok, State}.</diff>
      <filename>master/src/fair_scheduler_fair_policy.erl</filename>
    </modified>
    <modified>
      <diff>@@ -16,10 +16,10 @@ start_link(Nodes) -&gt;
 init(_) -&gt;
         {ok, queue:new()}.
 
-handle_cast({update_nodes, _}, _, Q) -&gt;
+handle_cast({update_nodes, _}, Q) -&gt;
         {noreply, Q};
 
-handle_cast({new_job, JobPid, JobName}, _, Q) -&gt;
+handle_cast({new_job, JobPid, JobName}, Q) -&gt;
         erlang:monitor(process, JobPid),
         {noreply, queue:in(JobPid, Q)}.
 
@@ -32,7 +32,7 @@ dropwhile(Q, NotJobs) -&gt;
                         V = lists:member(Job, NotJobs),
                         if V -&gt;
                                 dropwhile(NQ, NotJobs);
-                        false -&gt;
+                        true -&gt;
                                 {ok, Job}
                         end;
                 {empty, _} -&gt; nojobs
@@ -43,7 +43,6 @@ handle_info({'DOWN', _, _, Job, _}, Q) -&gt;
 
 % unused
 
-
 terminate(_Reason, _State) -&gt; {}.
 
 code_change(_OldVsn, State, _Extra) -&gt; {ok, State}.</diff>
      <filename>master/src/fair_scheduler_fifo_policy.erl</filename>
    </modified>
    <modified>
      <diff>@@ -2,9 +2,11 @@
 -module(fair_scheduler_job).
 -behaviour(gen_server).
 
--export([start/0, init/1, next_task/3, handle_call/3, handle_cast/2, 
+-export([start/2, init/1, next_task/3, handle_call/3, handle_cast/2, 
         handle_info/2, terminate/2, code_change/3]).
 
+-include(&quot;task.hrl&quot;).
+
 start(JobName, JobCoord) -&gt;
         error_logger:info_report([{&quot;JobProc starts for&quot;, JobName}]),
         gen_server:start(fair_scheduler_job, JobCoord, []).
@@ -32,14 +34,14 @@ next_task(Job, Jobs, AvailableNodes) -&gt;
                                 nonodes -&gt;
                                         none
                         end
-        end
+        end.
 
 % Return an often empty subset of AvailableNodes that don't have any tasks 
 % assigned to them by any job.
 all_empty_nodes(_, []) -&gt; [];
 all_empty_nodes([], AvailableNodes) -&gt; AvailableNodes;
 all_empty_nodes([Job|Jobs], AvailableNodes) -&gt; 
-        get_empty(Jobs, gen_server:call(Job,
+        all_empty_nodes(Jobs, gen_server:call(Job,
                 {get_empty_nodes, AvailableNodes})).
 
 % Assign a new task to this job.
@@ -48,7 +50,7 @@ handle_cast({new_task, Task}, {Tasks, Running, Nodes}) -&gt;
         {noreply, {NewTasks, Running, Nodes}};
 
 % Cluster topology changed (see below).
-handle_cast({update_nodes, NewNodes}, {Tasks, Running, Nodes}) -&gt;
+handle_cast({update_nodes, NewNodes}, {Tasks, Running, _}) -&gt;
         NewTasks = reassign_tasks(Tasks, NewNodes),
         {noreply, {NewTasks, Running, NewNodes}};
 
@@ -69,7 +71,7 @@ handle_call({get_empty_nodes, AvailableNodes}, _, {Tasks, _, _} = S) -&gt;
         case gb_trees:get(nopref, Tasks) of
                 {0, _} -&gt;
                         {reply, empty_nodes(Tasks, AvailableNodes), S};
-                _ =&gt;
+                _ -&gt;
                         {reply, [], S}
         end; 
 
@@ -79,7 +81,7 @@ handle_call({schedule_local, AvailableNodes}, _, {Tasks, Running, Nodes}) -&gt;
         {reply, Reply, {UpdatedTasks, Running, Nodes}};
 
 % Secondary scheduling policy (see below).
-handle_call({schedule_remote, FreeNodes}, _, {Tasks, Running, Nodes} = S) -&gt;
+handle_call({schedule_remote, FreeNodes}, _, {Tasks, Running, Nodes}) -&gt;
         {Reply, UpdatedTasks} = schedule_remote(Tasks, FreeNodes),
         {reply, Reply, {UpdatedTasks, Running, Nodes}}.
 
@@ -108,7 +110,7 @@ schedule_local(Tasks, AvailableNodes) -&gt;
                         end;
                 % Local tasks found. Choose an AvailableNode that has the
                 % longest queue of tasks waiting. Pick the first task from it.
-                Nodes -&gt; pop_busiest_node(Tasks, AvailableNodes)
+                Nodes -&gt; pop_busiest_node(Tasks, Nodes)
         end.
 
 % Secondary task scheduling policy:
@@ -136,11 +138,11 @@ pop_busiest_node(Tasks, Nodes) -&gt;
 
 % return nodes that don't have any local tasks assigned to them
 empty_nodes(Tasks, AvailableNodes) -&gt;
-        filter_nodes(Tasks, AvailableNodes, false);
+        filter_nodes(Tasks, AvailableNodes, false).
 
 % return nodes that have at least one local tasks assigned to them
 datalocal_nodes(Tasks, AvailableNodes) -&gt;
-        filter_nodes(Tasks, AvailableNodes, true);
+        filter_nodes(Tasks, AvailableNodes, true).
 
 filter_nodes(Tasks, AvailableNodes, Local) -&gt;
         lists:filter(fun(Node) -&gt;
@@ -158,8 +160,8 @@ filter_nodes(Tasks, AvailableNodes, Local) -&gt;
 assign_task(Task, Tasks, Nodes) -&gt;
         findpref(Task#task.input, Task, Tasks, Nodes).
 
-findpref([], Task, Tasks, Nodes) -&gt;
-        {N, L} = gb_trees:get(nopref, Tasks)
+findpref([], Task, Tasks, _) -&gt;
+        {N, L} = gb_trees:get(nopref, Tasks),
         gb_trees:update(nopref, {N + 1, [Task|L]}, Tasks);
 
 findpref([{_, Node}|R], Task, Tasks, Nodes) -&gt;
@@ -167,14 +169,14 @@ findpref([{_, Node}|R], Task, Tasks, Nodes) -&gt;
                 none -&gt;
                         ValidNode = lists:member(Node, Nodes),
                         if ValidNode -&gt;
-                                gb_trees:insert(Node, {1, [Task]}, Tasks)
+                                gb_trees:insert(Node, {1, [Task]}, Tasks);
                         true -&gt;
-                                findpref(R, Task, Tasks)
+                                findpref(R, Task, Tasks, Nodes)
                         end;
                 {value, {N, L}} -&gt;
                         IsBlack = lists:member(Node, Task#task.taskblack),
                         if IsBlack -&gt;
-                                findpref(R, Task, Tasks);
+                                findpref(R, Task, Tasks, Nodes);
                         true -&gt;
                                 gb_trees:update(Node, {N + 1, [Task|L]}, Tasks)
                         end
@@ -187,14 +189,14 @@ reassign_tasks(Tasks, NewNodes) -&gt;
                 case gb_trees:lookup(Node, OTasks) of
                         {value, TList} -&gt;
                                 {gb_trees:delete(Node, OTasks),
-                                 gb_trees:insert(Node, TList, NTasks)}
+                                 gb_trees:insert(Node, TList, NTasks)};
                         none -&gt;
-                                {OTasks, NTasks};
+                                {OTasks, NTasks}
                 end
         end, {Tasks, gb_trees:empty()}, NewNodes),
 
-        lists:foldl(fun(Task, NTasks) -&gt;
-                assing_task(Task, NTasks)
+        lists:foldl(fun(Task, NTasks0) -&gt;
+                assign_task(Task, NTasks0, NewNodes)
         end, gb_trees:insert(nopref, {0, []}, NTasks),
                 lists:flatten([L || {_, L} &lt;- gb_trees:values(OTasks)])).
 </diff>
      <filename>master/src/fair_scheduler_job.erl</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>66e0ef22abf56c8445e3c0ce1c367c0d6f140dc7</id>
    </parent>
  </parents>
  <author>
    <name>Ville Tuulos</name>
    <email>tuulos@dxfront.(none)</email>
  </author>
  <url>http://github.com/tuulos/disco/commit/28fc84effdcaad6d21dfdee6202dd6d0aab032fe</url>
  <id>28fc84effdcaad6d21dfdee6202dd6d0aab032fe</id>
  <committed-date>2009-08-28T16:22:19-07:00</committed-date>
  <authored-date>2009-08-28T16:22:19-07:00</authored-date>
  <message>scheduler compiles, not tested</message>
  <tree>521e4b2b7f1816b720d980fee3585d6317187fc0</tree>
  <committer>
    <name>Ville Tuulos</name>
    <email>tuulos@dxfront.(none)</email>
  </committer>
</commit>
