Skip to content

Commit

Permalink
modified shard to container, added simple-search
Browse files Browse the repository at this point in the history
  • Loading branch information
ldsimonassi committed Feb 7, 2012
1 parent c3f8090 commit 20e6dcf
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 39 deletions.
6 changes: 3 additions & 3 deletions src/main/java/search/SearchBucketBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.log4j.Logger;

import search.model.Item;
import search.model.ItemsShard;
import search.model.ItemsContainer;
import search.utils.SerializationUtils;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
Expand All @@ -31,7 +31,7 @@ public class SearchBucketBolt implements IRichBolt {
int totalShards;
int base_id;
SerializationUtils su;
ItemsShard shard;
ItemsContainer shard;



Expand All @@ -47,7 +47,7 @@ public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
String myId = context.getThisComponentId();
totalShards = context.getRawTopology().get_bolts().get(myId).get_common().get_parallelism_hint();
su = new SerializationUtils();
shard = new ItemsShard(10000);
shard = new ItemsContainer(10000);
}

private boolean isMine(int itemId) {
Expand Down
6 changes: 1 addition & 5 deletions src/main/java/search/model/Item.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public Item(long id, String title, double price) {
this.title= title;
this.price= price;
}

@Override
public boolean equals(Object obj) {
Item other= (Item)obj;
Expand All @@ -36,8 +36,4 @@ public int hashCode() {
public String toString() {
return "id:"+id+ " title: "+title+ " price:"+price;
}

public boolean greaterThan(Item itmB) {
return itmB.price<price;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,20 @@
import org.apache.log4j.Logger;

/**
*
* This is a single threaded class intended to store a small number of documents.
*
* @author ldsimonassi
* This is a single threaded class intended to store a number of documents that must fit in memory.
*/
public class ItemsShard {
public class ItemsContainer {
Logger log = Logger.getLogger(this.getClass());
HashMap<String, HashSet<Item>> index;
HashMap<Item, Item> myItems;
Set<Item> emptySet= Collections.emptySet();

public ItemsShard(int initialCapacity) {
public ItemsContainer(int initialCapacity) {
index = new HashMap<String, HashSet<Item>>(initialCapacity, 0.9f);
myItems= new HashMap<Item, Item>(initialCapacity, 0.9f);
}

public synchronized void add(Item i) {
public void add(Item i) {
if(myItems.containsKey(i))
update(i);

Expand All @@ -47,17 +45,15 @@ public synchronized void add(Item i) {


public void update(Item i) {
//TODO Implement a more efficient, but more complex update operation (if necessary).
// Update only if title changed
Item myItem= myItems.get(i);

if(myItem==null)
add(i);
else if(!i.title.equals(myItem.title)){
remove(i);
add(i);
}
}

public void remove(int itemId) {
Item i= new Item(itemId, "", 0);
remove(i);
Expand All @@ -83,7 +79,6 @@ public void remove(Item i) {
}
}


private List<String> getItemWords(Item i) {
ArrayList<String> ret = new ArrayList<String>();
StringTokenizer strTok = new StringTokenizer(i.title, " ", false);
Expand All @@ -93,15 +88,7 @@ private List<String> getItemWords(Item i) {
}
return ret;
}



Set<Item> emptySet= Collections.emptySet();
/**
* This method is highly concurrent
* @param word
* @return
*/
public Set<Item> getItemsContainingWord(String word) {
Set<Item> items= index.get(word);
if(items==null){
Expand All @@ -111,7 +98,6 @@ public Set<Item> getItemsContainingWord(String word) {
return items;
}


public Set<Item> getItemsContainingWords(String words){
log.debug("Query: ["+words+"]");
StringTokenizer strTok= new StringTokenizer(words, "-", false);
Expand All @@ -136,4 +122,4 @@ public Set<Item> getItemsContainingWords(String words){
}
return result;
}
}
}
52 changes: 52 additions & 0 deletions src/main/java/simplesearch/AnswerQueryBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package simplesearch;

import java.util.List;
import java.util.Map;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import search.model.Item;
import search.utils.SerializationUtils;
import storm.utils.AbstractAnswerBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;

public class AnswerQueryBolt extends AbstractAnswerBolt {
private static final long serialVersionUID = 1L;
SerializationUtils su;

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
super.prepare(stormConf, context, collector);
this.su= new SerializationUtils();
}

@Override
@SuppressWarnings("unchecked")
public void execute(Tuple input) {
String origin= input.getString(0);
String requestId= input.getString(1);
List<Item> finalResult= su.fromByteArray(input.getBinary(2));

JSONArray list = new JSONArray();
for (Item item : finalResult) {
JSONObject obj= new JSONObject();
obj.put("title", item.title);
obj.put("id", item.id);
obj.put("price", item.price);
list.add(obj);
}
String json= JSONValue.toJSONString(list);
sendBack(origin, requestId, json);
}

@Override
protected int getDestinationPort() {
return 8082;
}
}
39 changes: 39 additions & 0 deletions src/main/java/simplesearch/QueriesSpout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package simplesearch;

import java.util.Map;

import storm.utils.AbstractClientSpout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class QueriesSpout extends AbstractClientSpout {
private static final long serialVersionUID = 1L;

String queriesPullHost;
int maxPull;

@Override
public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.queriesPullHost= (String) conf.get("queries-pull-host");
try{
this.maxPull= Integer.parseInt((String)conf.get("max-pull"));
} catch(Exception ex){
this.maxPull= 1;
}

super.open(conf, context, collector);

}

@Override
protected String getPullHost() {
return queriesPullHost;
}

@Override
protected int getMaxPull() {
return maxPull;
}
}
120 changes: 120 additions & 0 deletions src/main/java/simplesearch/SearchBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package simplesearch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

import search.model.Item;
import search.model.ItemsContainer;
import search.utils.SerializationUtils;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class SearchBolt implements IRichBolt {
private static final long serialVersionUID = 1L;

Logger log;
OutputCollector collector;
@SuppressWarnings("rawtypes")
Map stormConf;
TopologyContext context;
int currentShard;
int totalShards;
int base_id;
SerializationUtils su;
ItemsContainer shard;



@Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
TopologyContext context,
OutputCollector collector) {
log = Logger.getLogger(this.getClass());
this.stormConf= stormConf;
this.context= context;
this.collector= collector;
su = new SerializationUtils();
shard = new ItemsContainer(10000);
}

private boolean isMine(int itemId) {
int remain = itemId % totalShards;
return remain == currentShard;
}

@Override
public void execute(Tuple input) {
if(input.getSourceComponent().equals("read-item-data")){
//String origin= input.getString(0);
//String requestId= input.getString(1);
int itemId= input.getInteger(2);
if(isMine(itemId)){
log.debug("Mine! "+currentShard+"/"+totalShards);
byte[] ba = input.getBinary(3);
if(ba==null) {
log.debug("Removing item id:"+itemId);
shard.remove(itemId);
} else {
Item i= su.itemFromByteArray(ba);
log.debug("Updating item index: "+i);
shard.update(i);
}
}
return ;
}

// Get request routing information
String origin= input.getString(0);
String requestId= input.getString(1);
String query= input.getString(2);


// Execute query with local data scope
List<Item> results= executeLocalQuery(query, 5);
log.debug("Searching ["+ query +"] in shard "+currentShard +" "+results.size()+" results found");
// Send data to next step: Merger
collector.emit(new Values(origin, requestId, query, su.toByteArray(results)));
}

private List<Item> executeLocalQuery(String query, int quantity) {
List<Item> items= new ArrayList<Item>(shard.getItemsContainingWords(query));

Collections.sort(items, new Comparator<Item>() {
@Override
public int compare(Item o1, Item o2) {
double diff= o1.price-o2.price;
if(diff>0)
return 1;
else
return -1;
}
});

if(items.size()>quantity)
items = items.subList(0, quantity-1);
return items;
}

@Override
public void cleanup() {
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("origin", "requestId", "query", "shardMatches"));
}




}
45 changes: 45 additions & 0 deletions src/main/java/simplesearch/SimpleSearchEngineTopologyStarter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package simplesearch;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class SimpleSearchEngineTopologyStarter {
public static StormTopology createTopology() {
TopologyBuilder builder= new TopologyBuilder();

builder.setSpout("queries-spout", new QueriesSpout(), 1);
builder.setBolt("queries-processor", new SearchBolt(), 1).allGrouping("queries-spout");
builder.setBolt("answer-query", new AnswerQueryBolt(), 2).allGrouping("queries-processor");
return builder.createTopology();
}

public static Config createConf(String queriesPullHost, String feedPullHost, String itemsApiHost, int maxPull) {
// Custom configuration
Config conf= new Config();
conf.put("queries-pull-host", queriesPullHost);
conf.put("items-api-host", itemsApiHost);
conf.put("max-pull", "100");
// Disable ackers mechanismo for this topology which doesn't need to be safe.
conf.put(Config.TOPOLOGY_ACKERS, 0);
return conf;
}

public static void main(String[] args) {
if(args.length < 5) {
System.err.println("Incorrect parameters. Use: <name> <queries-pull-host> <feed-pull-host> <items-api-host> <max-pulling>");
System.exit(-1);
}

System.out.println("Topology Name ["+args[0]+"]");
try {
Config conf= createConf(args[1], args[2], args[3], Integer.valueOf(args[4]));
conf.setNumWorkers(20);
StormSubmitter.submitTopology(args[0], conf, createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Loading

0 comments on commit 20e6dcf

Please sign in to comment.