Skip to content

Commit

Permalink
#12
Browse files Browse the repository at this point in the history
  • Loading branch information
andyhehk committed Nov 16, 2013
1 parent 96ec15c commit 02c90bb
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public BIDomainInfo() {
graph_supportedEngine.add(Constants.VERTICA);
graph_supportedEngine.add(Constants.SPARK);
graph_supportedEngine.add(Constants.SHARK);
graph_supportedEngine.add(Constants.HIVE_GIRAPH);

nested_supportedEngine.add(Constants.HIVE);
nested_supportedEngine.add(Constants.HADOOP);
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/bigframe/util/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class Constants {

public static String VERTICA = "vertica";

public static String HIVE_GIRAPH = "hive_giraph";

public static String MICRO = "micro";
public static String MACRO = "macro";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,20 @@ public void generate() {
Runtime rt = Runtime.getRuntime();
Process proc = rt.exec(cmd, null, new File(dagegen_script_path));

// InputStream stderr = proc.getErrorStream();
InputStream stderr = proc.getErrorStream();
InputStream stdout = proc.getInputStream();


// InputStreamReader isr = new InputStreamReader(stderr);
// BufferedReader br = new BufferedReader(isr);
InputStreamReader isr = new InputStreamReader(stderr);
BufferedReader br = new BufferedReader(isr);
InputStreamReader isout = new InputStreamReader(stdout);
BufferedReader br1 = new BufferedReader(isout);


String line = null;

// while ( (line = br.readLine()) != null)
// LOG.error(line);
while ( (line = br.readLine()) != null)
LOG.error(line);

while ( (line = br1.readLine()) != null)
LOG.info(line);
Expand Down
3 changes: 2 additions & 1 deletion project/BigFrameBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ object BigFrameBuild extends Build {
libraryDependencies ++= Seq(
"io.backchat.jerkson" % "jerkson_2.9.2" % "0.7.0",
"org.apache.mrunit" % "mrunit" % "1.0.0" % "test" classifier "hadoop1",
"org.apache.hive" % "hive-exec" % "0.12.0" % "provided"
"org.apache.hive" % "hive-exec" % "0.12.0" % "provided",
"org.apache.hive" % "hive-common" % "0.12.0" % "provided"
)
) ++ extraAssemblySettings ++ excludeJARfromCOMMON

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package bigframe.qgen.engineDriver;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.Logger;

import bigframe.bigif.BigConfConstants;
import bigframe.bigif.WorkflowInputFormat;
import bigframe.workflows.runnable.HiveGiraphRunnable;
import bigframe.workflows.runnable.HiveRunnable;

public class HiveGiraphEngineDriver extends EngineDriver {

private HiveConf hive_config;
private static final Logger LOG = Logger.getLogger(HadoopEngineDriver.class);
private List<HiveGiraphRunnable> queries = new ArrayList<HiveGiraphRunnable>();

private Connection connection;
private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";

public HiveGiraphEngineDriver(WorkflowInputFormat workIF) {
super(workIF);
hive_config = new HiveConf();
hive_config.addResource(new Path(workIF.getHiveHome()
+ "/conf/hive-site.xml"));
// TODO Auto-generated constructor stub
}

@Override
public int numOfQueries() {
// TODO Auto-generated method stub
return queries.size();
}

@Override
public void init() {
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(1);
}

try {
LOG.info("Connectiong to Hive JDBC server!!!");
connection = DriverManager.getConnection(workIF.getHiveJDBCServer(), "", "");
if(connection == null) {
LOG.error("Cannot connect to JDBC server! " +
"Make sure the HiveServer is running!");
System.exit(1);
}
else
LOG.info("Successful!!!");

for(HiveGiraphRunnable query : queries) {
LOG.info("Prepare tables...");
query.prepareHiveGiraphTables(connection);
}

} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(1);
}
}

@Override
public void run() {
for(HiveGiraphRunnable query : queries) {
query.runGiraph(hive_config);
}

}

@Override
public void cleanup() {
// TODO Auto-generated method stub

}

public void addQuery(HiveGiraphRunnable query) {
queries.add(query);
}
}
21 changes: 15 additions & 6 deletions qgen/src/main/java/bigframe/qgen/factory/BIDomainWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import bigframe.bigif.appDomainInfo.BIDomainInfo;
import bigframe.qgen.engineDriver.HadoopEngineDriver;
import bigframe.qgen.engineDriver.HiveEngineDriver;
import bigframe.qgen.engineDriver.HiveGiraphEngineDriver;
import bigframe.qgen.engineDriver.SharkEngineDriver;
import bigframe.qgen.engineDriver.SparkEngineDriver;
import bigframe.qgen.engineDriver.EngineDriver;
Expand All @@ -23,6 +24,7 @@
import com.sun.org.apache.commons.logging.Log;
import com.sun.org.apache.commons.logging.LogFactory;


/**
* Encapsulate all information of the workflow for the BI domain.
*
Expand Down Expand Up @@ -69,7 +71,7 @@ public List<EngineDriver> getWorkflows() {
SharkEngineDriver sharkWorkflow = new SharkEngineDriver(workflowIF);
SparkEngineDriver sparkWorkflow = new SparkEngineDriver(workflowIF);
VerticaEngineDriver verticaWorkflow = new VerticaEngineDriver(workflowIF);

HiveGiraphEngineDriver hivegiraphWorkflow = new HiveGiraphEngineDriver(workflowIF);

/**
* Record the paths for all the base table used.
Expand Down Expand Up @@ -112,6 +114,11 @@ else if(dataVariety.contains(Constants.GRAPH)) {
if(graphEngine.equals(Constants.HADOOP)) {

}

else if(graphEngine.equals(Constants.HIVE_GIRAPH)) {
hivegiraphWorkflow.addQuery(new
bigframe.workflows.BusinessIntelligence.graph.exploratory.WF_TwitterRank());
}
}

else if(dataVariety.contains(Constants.NESTED)) {
Expand Down Expand Up @@ -148,10 +155,10 @@ else if(relationalEngine.equals(Constants.SPARK)) {

else if(dataVariety.contains(Constants.GRAPH)) {

if(graphEngine.equals(Constants.HADOOP)) {
hadoopWorkflow.addQuery(new
bigframe.workflows.BusinessIntelligence.graph.exploratory.WF_PageRank(basePath.graph_path()));
}
// if(graphEngine.equals(Constants.HADOOP)) {
// hadoopWorkflow.addQuery(new
// bigframe.workflows.BusinessIntelligence.graph.exploratory.WF_PageRank(basePath.graph_path()));
// }
}

else if(dataVariety.contains(Constants.NESTED)) {
Expand Down Expand Up @@ -230,10 +237,12 @@ else if(nestedEngine.equals(Constants.SPARK) && nestedEngine.equals(Constants.SP
workflows.add(hadoopWorkflow);
if(sharkWorkflow.numOfQueries() > 0)
workflows.add(sharkWorkflow);
if(sparkWorkflow.numOfQueries() > 0)
if((Integer)sparkWorkflow.numOfQueries() > 0)
workflows.add(sparkWorkflow);
if(verticaWorkflow.numOfQueries() > 0)
workflows.add(verticaWorkflow);
if(hivegiraphWorkflow.numOfQueries() > 0)
workflows.add(hivegiraphWorkflow);

return workflows;
}
Expand Down
4 changes: 2 additions & 2 deletions tools/tpcds/gen_data.pl
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ sub println {
system qq(ssh $ssh_opts $hosts[$host] ).
qq(\"cd $LOCAL_DIR; ./gen_and_load.sh >& gen_and_load.out\");
println qq(Data generation completed at host: $hosts[$host]\n);
#system qq(ssh $ssh_opts $hosts[$host] ).
# qq(\"cd ..; rm -r $LOCAL_DIR\");
system qq(ssh $ssh_opts $hosts[$host] ).
qq(\"cd ..; rm -r $LOCAL_DIR\");
exit(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
val twitter_grapgHDFSPath = basePath.graph_path
val tweets_HDFSPath = basePath.nested_path

val stmt = connection.createStatement();
val stmt = connection.createStatement()


val add_JsonSerde = "ADD JAR "
Expand Down Expand Up @@ -784,19 +784,20 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
try {
val stmt = connection.createStatement();

val promotSKs = "1"
val lower = 1
val upper = 300

val drop_promotionSelected = "DROP TABLE IF EXISTS promotionSelected"
val create_promotionSelected = "CREATE TABLE promotionSelected (promo_id string, item_sk int," +
"start_date_sk int, end_date_sk int)"

/**
* Choose all promotion except those contain NULL value.
*/
val query_promotionSelected = "INSERT INTO TABLE promotionSelected" +
" SELECT p_promo_id, p_item_sk, p_start_date_sk, p_end_date_sk " +
" FROM promotion " +
" WHERE " + lower + " <= p_promo_sk AND p_promo_sk <= " + upper +
" AND p_item_sk IS NOT NULL AND p_start_date_sk IS NOT NULL AND p_end_date_sk IS NOT NULL"
" WHERE p_item_sk IS NOT NULL AND p_start_date_sk IS NOT NULL AND p_end_date_sk IS NOT NULL"
stmt.execute(drop_promotionSelected)
stmt.execute(create_promotionSelected)
stmt.execute(query_promotionSelected)
Expand Down Expand Up @@ -939,10 +940,6 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
val drop_simUserByProd = "DROP VIEW IF EXISTS simUserByProd"
val create_simUserByProd = "CREATE VIEW simUserByProd " +
" (item_sk, follower_id, friend_id, similarity) AS" +
" SELECT * FROM" +
" (SELECT item_sk, user_id as follower_id, user_id as friend_id, COALESCE(0) as similarity" +
" FROM mentionProb" +
" UNION ALL" +
" SELECT f.item_sk, follower_id, friend_id, (1 - ABS(follower_prob - prob)) as similarity" +
" FROM " +
" (SELECT item_sk, follower_id, friend_id, prob as follower_prob" +
Expand Down Expand Up @@ -1032,7 +1029,6 @@ class WF_ReportSaleSentimentHive(basePath: BaseTablePath, num_iter: Int) extends
" (SELECT t1.item_sk, follower_id, sum(transit_prob * rank_score) as sum_follower_score" +
" FROM transitMatrix t1 JOIN " + twitterRank_previous +" t2" +
" ON t1.friend_id = t2.user_id AND t1.item_sk = t2.item_sk " +
" WHERE t1.follower_id != t2.user_id" +
" GROUP BY " +
" t1.item_sk, follower_id) t3" +
" RIGHT OUTER JOIN randSUffVec t4" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class WF_ReportSaleSentimentVertica(basePath: BaseTablePath, num_iter: Int) exte
try {
val stmt = connection.createStatement();

val promotSKs = "1"
val lower = 1
val upper = 300

Expand All @@ -72,11 +71,13 @@ class WF_ReportSaleSentimentVertica(basePath: BaseTablePath, num_iter: Int) exte
stmt.execute(drop_promotionSelected)
stmt.execute(create_promotionSelected)

/**
* Choose all promotion except those contain NULL value.
*/
val query_promotionSelected = "INSERT INTO promotionSelected " +
" SELECT p_promo_id, p_item_sk, p_start_date_sk, p_end_date_sk " +
" FROM promotion " +
" WHERE " + lower + " <= p_promo_sk AND p_promo_sk <= " + upper +
" AND p_item_sk IS NOT NULL AND p_start_date_sk IS NOT NULL AND p_end_date_sk IS NOT NULL"
" WHERE p_item_sk IS NOT NULL AND p_start_date_sk IS NOT NULL AND p_end_date_sk IS NOT NULL"


stmt.executeUpdate(query_promotionSelected)
Expand Down Expand Up @@ -231,10 +232,7 @@ class WF_ReportSaleSentimentVertica(basePath: BaseTablePath, num_iter: Int) exte
" FROM mentionProb JOIN twitter_graph " +
" ON user_id = follower_id) f" +
" JOIN mentionProb " +
" ON friend_id = user_id" +
" UNION ALL" +
" SELECT item_sk, user_id, user_id, 0" +
" FROM mentionProb"
" ON friend_id = user_id"

stmt.execute(drop_simUserByProd)
stmt.execute(create_simUserByProd)
Expand Down Expand Up @@ -312,7 +310,6 @@ class WF_ReportSaleSentimentVertica(basePath: BaseTablePath, num_iter: Int) exte
" (SELECT t1.item_sk, follower_id, sum(transit_prob * rank_score) as sum_follower_score" +
" FROM transitMatrix t1, " + twitterRank_previous +" t2" +
" WHERE t1.friend_id = t2.user_id AND t1.item_sk = t2.item_sk " +
" AND t1.follower_id != t2.user_id" +
" GROUP BY " +
" t1.item_sk, follower_id) t3" +
" RIGHT JOIN randSUffVec t4" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package bigframe.workflows.BusinessIntelligence.graph.exploratory

import org.apache.giraph.graph.Vertex

import org.apache.hadoop.io.Writable
import org.apache.hadoop.io.DoubleWritable
import org.apache.hadoop.io.Text

import scala.collection.JavaConversions._

class TwitterRankVertex extends Vertex[Text, DoubleWritable, DoubleWritable, DoubleWritable] {

override def compute(messages: java.lang.Iterable[DoubleWritable]): Unit = {
if (getSuperstep() > 0) {
var twitterRank = 0.0;
val messages_iter = messages.iterator
while (messages_iter.hasNext) {
twitterRank += messages_iter.next.get;
}
setValue(new DoubleWritable(twitterRank));
}

if (getSuperstep() < 10) {
sendMessageToAllEdges(new DoubleWritable(getValue.get))
} else {
voteToHalt();
}

}

override def sendMessageToAllEdges(message: DoubleWritable): Unit = {
getEdges.iterator.foreach(e => sendMessage(e.getTargetVertexId,
new DoubleWritable(message.get *
getEdgeValue(e.getTargetVertexId).get)))
}

}
Loading

0 comments on commit 02c90bb

Please sign in to comment.