Skip to content

Commit

Permalink
#12
Browse files Browse the repository at this point in the history
  • Loading branch information
andyhehk committed Nov 17, 2013
1 parent 02c90bb commit 79b8fda
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void map(LongWritable key, Text value, Context context)

if(tableName.equals("tweet")) {

String tweet_id = (String) tweet_json.get("id");
Long tweet_id = (Long) tweet_json.get("id");
String text = (String) tweet_json.get("text");
String create_at = (String) tweet_json.get("created_at");

Expand All @@ -91,7 +91,7 @@ public void map(LongWritable key, Text value, Context context)

record.set("created_at", create_date);

record.setFromString("id", tweet_id);
record.setFromString("id", tweet_id.toString());
record.setFromString("user_id", user_id.toString());
record.setFromString("text", text);

Expand All @@ -105,7 +105,7 @@ public void map(LongWritable key, Text value, Context context)

}
else if(tableName.equals("entities")) {
String tweet_id = (String) tweet_json.get("id");
Long tweet_id = (Long) tweet_json.get("id");

try {

Expand All @@ -115,7 +115,7 @@ else if(tableName.equals("entities")) {
JSONArray hashtags = (JSONArray) entities_json.get("hashtags");

if (hashtags.isEmpty()){
record.setFromString("tweet_id", tweet_id);
record.setFromString("tweet_id", tweet_id.toString());
record.setFromString("hashtag", "");
context.write(new Text(tableName), record);
}
Expand All @@ -124,7 +124,7 @@ else if(tableName.equals("entities")) {
for(Object tag : hashtags) {
String tag_str = (String) tag;

record.setFromString("tweet_id", tweet_id);
record.setFromString("tweet_id", tweet_id.toString());
record.setFromString("hashtag", tag_str);
context.write(new Text(tableName), record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class HiveGiraphEngineDriver extends EngineDriver {

private HiveConf hive_config;
private static final Logger LOG = Logger.getLogger(HadoopEngineDriver.class);
private static final Logger LOG = Logger.getLogger(HiveGiraphEngineDriver.class);
private List<HiveGiraphRunnable> queries = new ArrayList<HiveGiraphRunnable>();

private Connection connection;
Expand All @@ -29,6 +29,10 @@ public HiveGiraphEngineDriver(WorkflowInputFormat workIF) {
hive_config = new HiveConf();
hive_config.addResource(new Path(workIF.getHiveHome()
+ "/conf/hive-site.xml"));
hive_config.addResource(new Path(workIF.getHadoopHome()
+ "/conf/core-site.xml"));
hive_config.addResource(new Path(workIF.getHadoopHome()
+ "/conf/mapred-site.xml"));
// TODO Auto-generated constructor stub
}

Expand Down Expand Up @@ -73,6 +77,7 @@ public void init() {

@Override
public void run() {
System.out.println("Running HiveGiraph queries!");
for(HiveGiraphRunnable query : queries) {
query.runGiraph(hive_config);
}
Expand Down
16 changes: 6 additions & 10 deletions qgen/src/main/java/bigframe/qgen/factory/BIDomainWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ else if(dataVariety.contains(Constants.GRAPH)) {
if(graphEngine.equals(Constants.HADOOP)) {

}

else if(graphEngine.equals(Constants.HIVE_GIRAPH)) {
hivegiraphWorkflow.addQuery(new
bigframe.workflows.BusinessIntelligence.graph.exploratory.WF_TwitterRank());
}

}

else if(dataVariety.contains(Constants.NESTED)) {
Expand Down Expand Up @@ -153,12 +149,12 @@ else if(relationalEngine.equals(Constants.SPARK)) {
}
}

else if(dataVariety.contains(Constants.GRAPH)) {
else if(dataVariety.contains(Constants.GRAPH)) {

// if(graphEngine.equals(Constants.HADOOP)) {
// hadoopWorkflow.addQuery(new
// bigframe.workflows.BusinessIntelligence.graph.exploratory.WF_PageRank(basePath.graph_path()));
// }
if(graphEngine.equals(Constants.HIVE_GIRAPH)) {
hivegiraphWorkflow.addQuery(new
bigframe.workflows.BusinessIntelligence.graph.exploratory.WF_TwitterRank());
}
}

else if(dataVariety.contains(Constants.NESTED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
* Tested on hive 0.9 - 0.12. No ORC file format is using.
*/
def prepareHiveTablesImpl1(connection: Connection): Unit = {
val itemHDFSPath = basePath.relational_path + "/item"
val itemHDFSPath = basePath.relational_path + "/item"
val web_salesHDFSPath = basePath.relational_path + "/web_sales"
val catalog_salesHDFSPath = basePath.relational_path + "/catalog_sales"
val store_salesHDFSPath = basePath.relational_path + "/store_sales"
Expand Down Expand Up @@ -400,9 +400,6 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
val stmt = connection.createStatement();


val add_JsonSerde = "ADD JAR "


val dropWebSales = "DROP TABLE web_sales"
val dropStoreSales = "DROP TABLE store_sales"
val dropCatalogSales = "DROP TABLE catalog_sales"
Expand Down Expand Up @@ -784,9 +781,9 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
try {
val stmt = connection.createStatement();

val lower = 1
val upper = 300

// val lower = 1
// val upper = 300
//
val drop_promotionSelected = "DROP TABLE IF EXISTS promotionSelected"
val create_promotionSelected = "CREATE TABLE promotionSelected (promo_id string, item_sk int," +
"start_date_sk int, end_date_sk int)"
Expand Down Expand Up @@ -935,6 +932,7 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends

stmt.execute(drop_mentionProb)
stmt.execute(create_mentionProb)
stmt.execute(query_mentionProb)


val drop_simUserByProd = "DROP VIEW IF EXISTS simUserByProd"
Expand All @@ -946,7 +944,7 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
" FROM mentionProb JOIN twitter_graph " +
" ON mentionProb.user_id = twitter_graph.follower_id) f" +
" JOIN mentionProb " +
" ON f.friend_id = mentionProb.user_id) c"
" ON f.friend_id = mentionProb.user_id"



Expand Down Expand Up @@ -1011,7 +1009,6 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
stmt.execute(drop_initalRank)
stmt.execute(create_initialRank)
stmt.execute(query_initialRank)


val alpha = 0.85
for(iteration <- 1 to num_iter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TRVertexToHive extends SimpleVertexToHive[Text, DoubleWritable, Writable]

record.set(0, item_sk)
record.set(1, user_id)
record.set(2, vertex.getValue().get())
record.set(2, vertex.getValue.get)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ class TwitterRankVertex extends Vertex[Text, DoubleWritable, DoubleWritable, Dou

override def compute(messages: java.lang.Iterable[DoubleWritable]): Unit = {
if (getSuperstep() > 0) {
var twitterRank = 0.0;
var twitterRank = 0.0
val messages_iter = messages.iterator
while (messages_iter.hasNext) {
twitterRank += messages_iter.next.get;
twitterRank += messages_iter.next.get
}
setValue(new DoubleWritable(twitterRank));
setValue(new DoubleWritable(twitterRank))
}

if (getSuperstep() < 10) {
sendMessageToAllEdges(new DoubleWritable(getValue.get))
} else {
voteToHalt();
voteToHalt()
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.io.Writable
import org.apache.hadoop.io.DoubleWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.JobID

import org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_SPLITS
import org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_EDGE_CLASS
Expand Down Expand Up @@ -67,22 +69,23 @@ class WF_TwitterRank() extends Query with HiveGiraphRunnable{
/**
* Get the copy of the hive configuration
*/
val hive_config_copy = new HiveConf(hive_config)
var hive_config_copy = new HiveConf(hive_config)

val workers = 1
val workers = 2
val dbName = "default"
val edgeInputTableStr = "initialRank"
val vertexInputTableStr = "transitMatrix"
val vertexOutputTableStr = "twitterRank"
val edgeInputTableStr = "transitmatrix"
val vertexInputTableStr = "initialrank"
val vertexOutputTableStr = "twitterrank"

HIVE_TO_VERTEX_CLASS.set(hive_config_copy, classOf[InitialRankToVertex])
HIVE_TO_EDGE_CLASS.set(hive_config_copy, classOf[TransitMatrixToEdge])
hive_config_copy.setClass(HiveVertexWriter.VERTEX_TO_HIVE_KEY,
classOf[TRVertexToHive], classOf[VertexToHive[Text, DoubleWritable, Writable]])

val job = new GiraphJob(hive_config_copy, getClass().getName())
var job = new GiraphJob(hive_config_copy, getClass().getName())
var giraphConf = job.getConfiguration()

giraphConf.setVertexClass(classOf[TwitterRankVertex])

var hiveVertexInputDescription = new HiveInputDescription()
var hiveEdgeInputDescription = new HiveInputDescription()
var hiveOutputDescription = new HiveOutputDescription()
Expand All @@ -103,15 +106,30 @@ class WF_TwitterRank() extends Query with HiveGiraphRunnable{
* Initialize the hive input settings
*/
hiveVertexInputDescription.setNumSplits(HIVE_VERTEX_SPLITS.get(giraphConf))
// hiveVertexInputDescription.setNumSplits(1)
HiveApiInputFormat.setProfileInputDesc(giraphConf, hiveVertexInputDescription,
VERTEX_INPUT_PROFILE_ID)
giraphConf.setVertexInputFormatClass(classOf[HiveVertexInputFormat[Text, DoubleWritable, Writable]])
HiveTableSchemas.put(giraphConf, VERTEX_INPUT_PROFILE_ID,
hiveVertexInputDescription.hiveTableName())

val properties = giraphConf.iterator

hiveEdgeInputDescription.setNumSplits(HIVE_EDGE_SPLITS.get(giraphConf));
while(properties.hasNext) {
val map = properties.next
println(map.getKey + ":" + map.getValue )
}

var defaultInputFormat = new HiveApiInputFormat()

var splits = defaultInputFormat.getSplits(new JobContext(giraphConf, new JobID()))

println("getSplits returned " + splits.size() + " splits")

// hiveEdgeInputDescription.setNumSplits(1)
hiveEdgeInputDescription.setNumSplits(HIVE_EDGE_SPLITS.get(giraphConf))
HiveApiInputFormat.setProfileInputDesc(giraphConf, hiveEdgeInputDescription,
EDGE_INPUT_PROFILE_ID);
EDGE_INPUT_PROFILE_ID)
giraphConf.setEdgeInputFormatClass(classOf[HiveEdgeInputFormat[Text, DoubleWritable]]);
HiveTableSchemas.put(giraphConf, EDGE_INPUT_PROFILE_ID,
hiveEdgeInputDescription.hiveTableName())
Expand All @@ -121,10 +139,10 @@ class WF_TwitterRank() extends Query with HiveGiraphRunnable{
* Initialize the hive output settings
*/
HiveApiOutputFormat.initProfile(giraphConf, hiveOutputDescription,
VERTEX_OUTPUT_PROFILE_ID);
giraphConf.setVertexOutputFormatClass(classOf[HiveVertexOutputFormat[Text, DoubleWritable, Writable]]);
VERTEX_OUTPUT_PROFILE_ID)
giraphConf.setVertexOutputFormatClass(classOf[HiveVertexOutputFormat[Text, DoubleWritable, Writable]])
HiveTableSchemas.put(giraphConf, VERTEX_OUTPUT_PROFILE_ID,
hiveOutputDescription.hiveTableName());
hiveOutputDescription.hiveTableName())

/**
* Set number of workers
Expand Down

0 comments on commit 79b8fda

Please sign in to comment.