Skip to content

Commit

Permalink
Fixed couple of issues with the sensei group by map reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
vzhabiuk committed Apr 10, 2013
1 parent 57cd40b commit 3acc086
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;

import javax.management.MBeanServer;
import javax.management.ObjectName;
Expand All @@ -20,6 +21,7 @@

import com.browseengine.bobo.api.FacetSpec;
import com.linkedin.norbert.NorbertException;
import com.linkedin.norbert.cluster.ClusterDisconnectedException;
import com.linkedin.norbert.javacompat.cluster.ClusterClient;
import com.linkedin.norbert.javacompat.cluster.Node;
import com.linkedin.norbert.javacompat.network.PartitionedNetworkClient;
Expand Down Expand Up @@ -49,6 +51,9 @@ public class SenseiBroker extends AbstractConsistentHashBroker<SenseiRequest, Se

private final boolean allowPartialMerge;
private final ClusterClient clusterClient;

private volatile boolean disconnected;

private static Counter numberOfNodesInTheCluster = Metrics.newCounter(new MetricName(SenseiBroker.class, "numberOfNodesInTheCluster"));
public SenseiBroker(PartitionedNetworkClient<String> networkClient, ClusterClient clusterClient, boolean allowPartialMerge)
throws NorbertException {
Expand Down Expand Up @@ -256,6 +261,17 @@ public int getNumberOfNodes() {
}
return count;
}


@Override
protected List<SenseiResult> doCall(SenseiRequest req) throws ExecutionException {
try {
// TODO Auto-generated method stub
return super.doCall(req);
} catch (ClusterDisconnectedException ex) {
disconnected = true;
throw ex;
}
}
public boolean isDisconnected() {
return disconnected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ public void setLoadBalancerFactory(PartitionedLoadBalancerFactory<String> loadBa
public void setAllowPartialMerge(boolean allowPartialMerge) {
this.allowPartialMerge = allowPartialMerge;
}

public void shutdown() {
networkClientConfig.setClusterClient(null);
try {
clusterClient.shutdown();
clusterClient = null;
} finally {
networkClient.shutdown();
networkClient = null;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static void decorateWithMapReduce(JSONObject jsonObj, java.util.List<Pair
int countSum = 0;
int top = groupBy.optInt("top");
for (Pair<String, String> pair: aggreagationFunctions) {
if (columns.length() == 1 && "sum".equals(pair.getFirst()) && countSum == 0) {
if (columns.length() == 1 && "sum".equalsIgnoreCase(pair.getFirst()) && countSum == 0) {
countSum++;

JSONObject facetSpec = new FastJSONObject().put("expand", false)
Expand All @@ -59,7 +59,7 @@ public static void decorateWithMapReduce(JSONObject jsonObj, java.util.List<Pair
jsonObj.put("facets", new FastJSONObject());
}
jsonObj.getJSONObject("facets").put(SenseiFacetHandlerBuilder.SUM_GROUP_BY_FACET_NAME, facetSpec);
} else if (columns.length() == 1 && "count".equals(pair.getFirst()) ) {
} else if (columns.length() == 1 && "count".equalsIgnoreCase(pair.getFirst()) ) {
JSONObject facetSpec = new FastJSONObject().put("expand", false)
.put("minhit", 0)
.put("max", top);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public void merge(GroupedValue anotherValue) {
count += ((CountGroupedValue) anotherValue).count;
}

@Override
public String toString() {
return "CountGroupedValue [count=" + count + "]";
}

}

public static class CountAggregationFunction implements AggregateFunction<CountGroupedValue> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public MapResult(int initialCapacity, TermValueList[] dictionaries, BoboIndexRea
public Long2ObjectOpenHashMap<GroupedValue> results;
public TermValueList[] dictionaries;
public BoboIndexReader indexReader;
@Override
public String toString() {
return "MapResult [results=" + results + ", dictionaries=" + java.util.Arrays.toString(dictionaries) + "]";
}

}

public class GroupByMapReduceJob implements SenseiMapReduce<Serializable, HashMap<String, GroupedValue>> {
Expand Down Expand Up @@ -106,8 +111,8 @@ public Serializable map(IntArray docIds, int docIdCount, long[] uids, FieldAcces
}
}

if (mapResult.results.size() > TRIM_SIZE * 20) {
trimToSize(mapResult.results, TRIM_SIZE * 5);
if (mapResult.results.size() > Math.max(TRIM_SIZE, top) * 20) {
trimToSize(mapResult.results, Math.max(TRIM_SIZE, top) * 5);
}

return mapResult;
Expand Down Expand Up @@ -154,7 +159,9 @@ private String decodeKey(String[] str, TermValueList[] dictionaries, int[] numBi

@Override
public List<Serializable> combine(List<Serializable> mapResults, CombinerStage combinerStage) {
/*
// System.out.println("Combine - " + mapResults);

/*
* if (combinerStage == CombinerStage.partitionLevel) { if (map == null)
* { return Collections.EMPTY_LIST; } trimToSize(map, TRIM_SIZE * 5);
* List<HashMap<String, GroupedValue>> ret =
Expand All @@ -173,8 +180,20 @@ public List<Serializable> combine(List<Serializable> mapResults, CombinerStage c
for (int i = 0; i < mapResults.size(); i++) {
MapResult current = (MapResult) mapResults.get(i);
if (results.get(current.indexReader) != null) {
results.get(current.indexReader).results.putAll(current.results);
trimToSize(current.results, TRIM_SIZE);

Long2ObjectOpenHashMap<GroupedValue> currentMergedResults = results.get(current.indexReader).results;

Long2ObjectOpenHashMap<GroupedValue> currentResultsToMerge = current.results;
for (long key : currentResultsToMerge.keySet()) {
GroupedValue groupedValue = currentMergedResults.get(key);
if (groupedValue != null) {
groupedValue.merge(currentResultsToMerge.get(key));
} else {
currentMergedResults.put(key, currentResultsToMerge.get(key));
}
}
// .putAll(currentResultsToMerge);
trimToSize(currentResultsToMerge, Math.max(TRIM_SIZE, top));
} else {
results.put(current.indexReader, current);
}
Expand All @@ -188,6 +207,7 @@ public List<Serializable> combine(List<Serializable> mapResults, CombinerStage c
}

}
// System.out.println("End combine - " + ret);
return java.util.Arrays.asList((Serializable)ret);
}
if (mapResults.size() == 0) {
Expand All @@ -201,7 +221,7 @@ public List<Serializable> combine(List<Serializable> mapResults, CombinerStage c
for (int i = 1; i < mapResults.size(); i++) {
merge(firstMap, (HashMap<String, GroupedValue>) mapResults.get(i));
}
trimToSize(firstMap, TRIM_SIZE);
trimToSize(firstMap, Math.max(TRIM_SIZE, top));
return java.util.Arrays.asList((Serializable)firstMap);

}
Expand Down Expand Up @@ -372,7 +392,7 @@ public JSONObject render(HashMap<String, GroupedValue> reduceResult) {
JSONArray jsonArrResult = (JSONArray) result;
if (jsonArrResult.length() > top) {
JSONArray newArr = new JSONUtil.FastJSONArray();
for (int i = 0; i <= top; i++) {
for (int i = 0; i < top; i++) {
newArr.put(jsonArrResult.get(i));
}
jsonArrResult = newArr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class SingleNodeStarter {
private static Server jettyServer;
private static SenseiServer server;
private static SenseiBroker senseiBroker;
private static BrokerConfig brokerConfig;

public static void start(String localPath, int expectedDocs) {
start(new File(getUri(localPath)), expectedDocs);
Expand All @@ -47,7 +48,7 @@ public void run() {
}
});
PartitionedLoadBalancerFactory balancerFactory = new SenseiPartitionedLoadBalancerFactory(50);
BrokerConfig brokerConfig = new BrokerConfig(senseiConfiguration, balancerFactory);
brokerConfig = new BrokerConfig(senseiConfiguration, balancerFactory);
brokerConfig.init();
senseiBroker = brokerConfig.buildSenseiBroker();
waitTillServerStarts(expectedDocs);
Expand All @@ -61,6 +62,13 @@ public static void waitTillServerStarts(int expectedDocs) throws SenseiException
int counter = 0;
while (true) {
SenseiResult senseiResult = senseiBroker.browse(new SenseiRequest());
if (senseiBroker.isDisconnected()) {
brokerConfig.shutdown();
Thread.sleep(5000);
brokerConfig.init();
senseiBroker = brokerConfig.buildSenseiBroker();
System.out.println("Restarted the broker");
}
int totalDocs = senseiResult.getTotalDocs();
System.out.println("TotalDocs = " + totalDocs);
if (counter > 200) {
Expand Down

0 comments on commit 3acc086

Please sign in to comment.