Skip to content
This repository
Browse code

accumulate multi-keep mapred query outputs more efficiently

as with single-keep, avoid repeated list appending, by building the
result list backward with cons
  • Loading branch information...
commit 98fbf04a18fd34f2bcfd82e5b1a5b62ee2c0a4ad 1 parent f66a9e9
Bryan Fink authored May 25, 2012

Showing 1 changed file with 17 additions and 6 deletions. Show diff stats Hide diff stats

  1. 23  src/riakc_pb_socket.erl
23  src/riakc_pb_socket.erl
@@ -1434,9 +1434,7 @@ wait_for_mapred_one(ReqId, Timeout, Phase, Acc) ->
1434 1434
                                 acc_mapred_one(Res, Acc));
1435 1435
         {mapred, NewPhase, Res} ->
1436 1436
             %% results from a new phase have arrived - track them all
1437  
-            Dict = orddict:from_list(
1438  
-                     [{NewPhase, Res}
1439  
-                      |finish_mapred_one(Phase, Acc)]),
  1437
+            Dict = [{NewPhase, Res},{Phase, Acc}],
1440 1438
             wait_for_mapred_many(ReqId, Timeout, Dict);
1441 1439
         {error, _}=Error ->
1442 1440
             Error;
@@ -1457,16 +1455,29 @@ finish_mapred_one(Phase, Acc) ->
1457 1455
 wait_for_mapred_many(ReqId, Timeout, Acc) ->
1458 1456
     case receive_mapred(ReqId, Timeout) of
1459 1457
         done ->
1460  
-            {ok, orddict:to_list(Acc)};
  1458
+            {ok, finish_mapred_many(Acc)};
1461 1459
         {mapred, Phase, Res} ->
1462 1460
             wait_for_mapred_many(
1463  
-              ReqId, Timeout, orddict:append_list(Phase, Res, Acc));
  1461
+              ReqId, Timeout, acc_mapred_many(Phase, Res, Acc));
1464 1462
         {error, _}=Error ->
1465 1463
             Error;
1466 1464
         timeout ->
1467  
-            {error, {timeout, orddict:to_list(Acc)}}
  1465
+            {error, {timeout, finish_mapred_many(Acc)}}
1468 1466
     end.
1469 1467
 
  1468
+%% Many-phase outputs are kepts as a proplist of reversed lists of
  1469
+%% results.
  1470
+acc_mapred_many(Phase, Res, Acc) ->
  1471
+    case lists:keytake(Phase, 1, Acc) of
  1472
+        {value, {Phase, PAcc}, RAcc} ->
  1473
+            [{Phase,acc_mapred_one(Res,PAcc)}|RAcc];
  1474
+        false ->
  1475
+            [{Phase,acc_mapred_one(Res,[])}|Acc]
  1476
+    end.
  1477
+
  1478
+finish_mapred_many(Acc) ->
  1479
+    [ {P, lists:reverse(A)} || {P, A} <- lists:keysort(1, Acc) ].
  1480
+
1470 1481
 %% Receive one mapred message.
1471 1482
 -spec receive_mapred(reference(), timeout()) ->
1472 1483
          done | {mapred, integer(), [term()]} | {error, term()} | timeout.

0 notes on commit 98fbf04

Please sign in to comment.
Something went wrong with that request. Please try again.