<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array"/>
  <modified type="array">
    <modified>
      <diff>@@ -14,6 +14,7 @@
 -define(MAX_MSG_LENGTH, 8192).
 -define(RATE_WINDOW, 100000). % 100ms
 -define(RATE_LIMIT, 10).
+-define(ERRLINES_MAX, 100).
 -define(OOB_MAX, 1000).
 -define(OOB_KEY_MAX, 256).
 
@@ -169,6 +170,8 @@ worker_exit(#state{id = Id, master = Master}, Msg) -&gt;
         gen_server:cast(Master, {exit_worker, Id, Msg}),
         normal.
 
+errlines(#state{errlines = L}) -&gt; lists:flatten(lists:reverse(L)).
+
 handle_info({_, {data, {eol, &lt;&lt;&quot;**&lt;PID&gt;&quot;, Line/binary&gt;&gt;}}}, S) -&gt;
         {noreply, S#state{child_pid = binary_to_list(Line)}}; 
 
@@ -178,12 +181,10 @@ handle_info({_, {data, {eol, &lt;&lt;&quot;**&lt;MSG&gt;&quot;, Line0/binary&gt;&gt;}}}, S) -&gt;
         true -&gt;
                 Line = Line0
         end,
-
+        event(S, &quot;&quot;, strip_timestamp(Line)),
         T = now(),
         D = timer:now_diff(T, S#state.last_msg),
-        event(S, &quot;&quot;, strip_timestamp(Line)),
         S1 = S#state{last_msg = T, linecount = S#state.linecount + 1},
-        
         if D &gt; ?RATE_WINDOW -&gt;
                 {noreply, S1#state{msg_counter = 1}};
         S1#state.msg_counter &gt; ?RATE_LIMIT -&gt;
@@ -201,7 +202,7 @@ handle_info({_, {data, {eol, &lt;&lt;&quot;**&lt;ERR&gt;&quot;, Line/binary&gt;&gt;}}}, S) -&gt;
 
 handle_info({_, {data, {eol, &lt;&lt;&quot;**&lt;DAT&gt;&quot;, Line/binary&gt;&gt;}}}, S) -&gt;
         M = strip_timestamp(Line),
-        event(S, &quot;WARN&quot;, M ++ [10] ++ S#state.errlines),
+        event(S, &quot;WARN&quot;, M ++ [10] ++ errlines(S)),
         {stop, worker_exit(S, {data_error, M}), S};
 
 handle_info({_, {data, {eol, &lt;&lt;&quot;**&lt;OUT&gt;&quot;, Line/binary&gt;&gt;}}}, S) -&gt;
@@ -231,55 +232,48 @@ handle_info({_, {data, {eol, &lt;&lt;&quot;**&lt;OOB&gt;&quot;, Line/binary&gt;&gt;}}}, S) -&gt;
                 {noreply, S1}
         end;
 
-handle_info({_, {data, {eol, &lt;&lt;&quot;**&quot;, _/binary&gt;&gt; = Line}}}, S) -&gt;
-        event(S, &quot;WARN&quot;, &quot;Unknown line ID: &quot; ++ binary_to_list(Line)),
-        {noreply, S};               
+handle_info({_, {data, {eol, _}}}, S) when
+                length(S#state.errlines) &gt; ?ERRLINES_MAX -&gt;
+        Err = &quot;Worker failed (too much output on stdout):\n&quot; ++
+                        errlines(S),
+        event(S, &quot;ERROR&quot;, Err),
+        {stop, worker_exit(S, {job_error, Err}), S};
 
 handle_info({_, {data, {eol, Line}}}, S) -&gt;
-        {noreply, S#state{errlines = S#state.errlines 
-                ++ binary_to_list(Line) ++ [10]}};
+        {noreply, S#state{errlines =
+                [[binary_to_list(Line), 10]|S#state.errlines]}};
 
 handle_info({_, {data, {noeol, Line}}}, S) -&gt;
-        event(S, &quot;WARN&quot;, &quot;Truncated line: &quot; ++ binary_to_list(Line)),
-        {noreply, S};
+        {noreply, S#state{errlines =
+                [[binary_to_list(Line), &quot;...&quot;, 10]|S#state.errlines]}};
 
 handle_info({_, {exit_status, _Status}}, #state{linecount = 0} = S) -&gt;
-        M =  &quot;Worker didn't start:\n&quot; ++ S#state.errlines,
+        M =  &quot;Worker didn't start:\n&quot; ++ errlines(S),
         event(S, &quot;WARN&quot;, M),
-        %gen_server:cast(S#state.master, {exit_worker, S#state.task,
-        %        {data_error, {M, S#state.task}}}),
         {stop, worker_exit(S, {data_error, M}), S};
 
 handle_info({_, {exit_status, _Status}}, S) -&gt;
-        M =  &quot;Worker failed. Last words:\n&quot; ++ S#state.errlines,
+        M =  &quot;Worker failed. Last words:\n&quot; ++ errlines(S),
         event(S, &quot;ERROR&quot;, M),
-        %gen_server:cast(S#state.master,
-        %        {exit_worker, S#state.task, {job_error, M}}),
         {stop, worker_exit(S, {job_error, M}), S};
         
 handle_info({_, closed}, S) -&gt;
-        M = &quot;Worker killed. Last words:\n&quot; ++ S#state.errlines,
+        M = &quot;Worker killed. Last words:\n&quot; ++ errlines(S),
         event(S, &quot;ERROR&quot;, M),
-        %gen_server:cast(S#state.master, 
-        %        {exit_worker, S#state.task, {job_error, M}}),
         {stop, worker_exit(S, {job_error, M}), S};
 
 handle_info(timeout, #state{linecount = 0} = S) -&gt;
         M = &quot;Worker didn't start in 30 seconds&quot;,
         event(S, &quot;WARN&quot;, M),
-        %gen_server:cast(S#state.master, {exit_worker, S#state.id,
-        %        {data_error, {M, S#state.task}}}),
         {stop, worker_exit(S, {data_error, M}), S};
 
 handle_info({'DOWN', _, _, _, _}, S) -&gt;
-        M = &quot;Worker killed. Last words:\n&quot; ++ S#state.errlines,
+        M = &quot;Worker killed. Last words:\n&quot; ++ errlines(S),
         event(S, &quot;ERROR&quot;, M),
-        %gen_server:cast(S#state.master, 
-        %        {exit_worker, S#state.task, {job_error, M}}),
         {stop, worker_exit(S, {job_error, M}), S}.
 
 handle_cast(kill_worker, S) -&gt; 
-        M = &quot;Worker killed. Last words:\n&quot; ++ S#state.errlines,
+        M = &quot;Worker killed. Last words:\n&quot; ++ errlines(S),
         event(S, &quot;ERROR&quot;, M),
         {stop, worker_exit(S, {job_error, M}), S}.
 </diff>
      <filename>master/src/disco_worker.erl</filename>
    </modified>
    <modified>
      <diff>@@ -6,6 +6,7 @@ def check_dead(job):
         if job.jobinfo()['active'] == &quot;dead&quot;:
                 job.purge()
         else:
+                job.purge()
                 raise Exception(&quot;Rate limit failed&quot;)
 
 def data_gen(path):
@@ -18,6 +19,11 @@ def fun_map(e, params):
 def fun_map2(e, params):
         return []
 
+def fun_map3(e, params):
+        for i in range(100000):
+            print &quot;foobar&quot;
+        return []
+
 tserver.run_server(data_gen)
 inputs = tserver.makeurl([1])
 job = Disco(sys.argv[1]).new_job(name = &quot;test_ratelimit&quot;,
@@ -33,6 +39,12 @@ time.sleep(5)
 check_dead(job)
 
 job = Disco(sys.argv[1]).new_job(name = &quot;test_ratelimit3&quot;,
+        input = inputs, map = fun_map3, status_interval = 1)
+
+time.sleep(5)
+check_dead(job)
+
+job = Disco(sys.argv[1]).new_job(name = &quot;test_ratelimit4&quot;,
         input = inputs, map = fun_map2, status_interval = 0)
 job.wait()
 job.purge()</diff>
      <filename>test/test_ratelimit.py</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>7635c9a39d8394be817ae03a9d0400fca87de140</id>
    </parent>
  </parents>
  <author>
    <name>Ville Tuulos</name>
    <email>tuulos@dxfront.(none)</email>
  </author>
  <url>http://github.com/tuulos/disco/commit/db76c80fc293edecc3102420abe51c6a3f7f291f</url>
  <id>db76c80fc293edecc3102420abe51c6a3f7f291f</id>
  <committed-date>2009-10-04T20:54:20-07:00</committed-date>
  <authored-date>2009-10-04T20:54:20-07:00</authored-date>
  <message>Apply rate limit to all messages on stdout / stderr. Closes #21</message>
  <tree>138e84e24e912f081c4913e44f2c6730c7b51899</tree>
  <committer>
    <name>Ville Tuulos</name>
    <email>tuulos@dxfront.(none)</email>
  </committer>
</commit>
