diff --git a/bin/workloads/graph/nweight/prepare/prepare.sh b/bin/workloads/graph/nweight/prepare/prepare.sh
new file mode 100755
index 000000000..8b5cd96e2
--- /dev/null
+++ b/bin/workloads/graph/nweight/prepare/prepare.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+current_dir=`dirname "$0"`
+root_dir=${current_dir}/../../../../../
+workload_config=${root_dir}/conf/workloads/graph/nweight.conf
+. "${root_dir}/bin/functions/load-bench-config.sh"
+
+enter_bench NWeightPrepare ${workload_config}
+show_bannar start
+
+rmr-hdfs $INPUT_HDFS || true
+START_TIME=`timestamp`
+
+run-spark-job com.intel.hibench.sparkbench.graph.nweight.NWeightDataGenerator $MODEL_INPUT $INPUT_HDFS $EDGES
+
+END_TIME=`timestamp`
+
+show_bannar finish
+leave_bench
+
diff --git a/bin/workloads/graph/nweight/spark/run.sh b/bin/workloads/graph/nweight/spark/run.sh
new file mode 100755
index 000000000..029d9e821
--- /dev/null
+++ b/bin/workloads/graph/nweight/spark/run.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+current_dir=`dirname "$0"`
+root_dir=${current_dir}/../../../../../
+workload_config=${root_dir}/conf/workloads/graph/nweight.conf
+. "${root_dir}/bin/functions/load-bench-config.sh"
+
+enter_bench ScalaSparkNWeight ${workload_config}
+show_bannar start
+
+rmr-hdfs $OUTPUT_HDFS || true
+
+SIZE=`dir_size $INPUT_HDFS`
+START_TIME=`timestamp`
+run-spark-job com.intel.hibench.sparkbench.graph.nweight.NWeight $INPUT_HDFS $OUTPUT_HDFS $DEGREE $MAX_OUT_EDGES $NUM_PARTITION $STORAGE_LEVEL $DISABLE_KRYO $MODEL
+END_TIME=`timestamp`
+
+gen_report ${START_TIME} ${END_TIME} ${SIZE}
+show_bannar finish
+leave_bench
diff --git a/conf/hibench.conf b/conf/hibench.conf
index 2fcbf8442..27907fc82 100644
--- a/conf/hibench.conf
+++ b/conf/hibench.conf
@@ -43,13 +43,13 @@ hibench.hive.release hive-0.12.0-bin
hibench.hivebench.template.dir ${hibench.dependency.dir}/hivebench/hive_template
hibench.hive.dir.name.input ${hibench.workload.dir.name.input}
hibench.hive.dir.name.ouput ${hibench.workload.dir.name.output}
+
hibench.bayes.dir.name.input ${hibench.workload.dir.name.input}
hibench.bayes.dir.name.output ${hibench.workload.dir.name.output}
hibench.pagerank.dir.name.input ${hibench.workload.dir.name.input}
hibench.pagerank.dir.name.output ${hibench.workload.dir.name.output}
hibench.pagerank.pegasus.dir ${hibench.dependency.dir}/pegasus/target/pegasus-2.0-SNAPSHOT.jar
-
hibench.mahout.release.apache mahout-distribution-0.9
hibench.mahout.release.hdp mahout-distribution-0.9
hibench.mahout.release.cdh5 mahout-0.9-cdh5.1.0
@@ -66,4 +66,3 @@ hibench.nutch.home ${hibench.dependency.dir}/nutchindexing/target/${hibench.nut
hibench.dfsioe.dir.name.input ${hibench.workload.dir.name.input}
hibench.dfsioe.dir.name.output ${hibench.workload.dir.name.output}
-hibench.nweight.model_path ${hibench.dependency.dir}/sparkbench/src/main/scala/com/intel/sparkbench/nweight/model/user-features
diff --git a/conf/workloads/graph/nweight.conf b/conf/workloads/graph/nweight.conf
new file mode 100644
index 000000000..175fd3e15
--- /dev/null
+++ b/conf/workloads/graph/nweight.conf
@@ -0,0 +1,38 @@
+hibench.nweight.tiny.edges 100000
+hibench.nweight.tiny.degree 3
+hibench.nweight.tiny.max_out_edges 30
+hibench.nweight.small.edges 1000000
+hibench.nweight.small.degree 3
+hibench.nweight.small.max_out_edges 30
+hibench.nweight.large.edges 10000000
+hibench.nweight.large.degree 3
+hibench.nweight.large.max_out_edges 30
+hibench.nweight.huge.edges 100000000
+hibench.nweight.huge.degree 3
+hibench.nweight.huge.max_out_edges 30
+hibench.nweight.gigantic.edges 425000000
+hibench.nweight.gigantic.degree 3
+hibench.nweight.gigantic.max_out_edges 30
+hibench.nweight.bigdata.edges 4250000000
+hibench.nweight.bigdata.degree 3
+hibench.nweight.bigdata.max_out_edges 30
+
+hibench.nweight.edges ${hibench.nweight.${hibench.scale.profile}.edges}
+hibench.nweight.degree ${hibench.nweight.${hibench.scale.profile}.degree}
+hibench.nweight.max_out_edges ${hibench.nweight.${hibench.scale.profile}.max_out_edges}
+
+hibench.nweight.partitions ${hibench.default.map.parallelism}
+# storageLevel for the RDDs 0: off-heap 1: disk_only 3: memory_only 5: memory_ser 7: memory_and_disk 9: memory_and_disk_ser
+hibench.nweight.storage_level 7
+hibench.nweight.disable_kryo false
+# run nweight in graphx or pregel
+hibench.nweight.model graphx
+
+hibench.nweight.model_path ${hibench.home}/sparkbench/graph/src/main/resources/nweight-user-features
+
+hibench.sparkbench.jar ${hibench.home}/sparkbench/graph/target/sparkbench-graph-6.0-SNAPSHOT-${hibench.spark.version}-jar-with-dependencies.jar
+
+# export for shell script
+hibench.workload.input ${hibench.hdfs.data.dir}/NWeight/Input
+hibench.workload.output ${hibench.hdfs.data.dir}/NWeight/Output
+hibench.workload.edges ${hibench.nweight.edges}
diff --git a/sparkbench/graph/pom.xml b/sparkbench/graph/pom.xml
index 898001eef..818d923e4 100644
--- a/sparkbench/graph/pom.xml
+++ b/sparkbench/graph/pom.xml
@@ -13,4 +13,233 @@
jar
6.0-SNAPSHOT
sparkbench-graph
+
+
+
+
+ com.intel.hibench.sparkbench
+ sparkbench-common
+ ${project.version}
+
+
+ org.scala-lang
+ scala-library
+ ${scala.version}
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-yarn_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-streaming_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-mllib_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-graphx_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ com.github.scopt
+ scopt_2.10
+ ${scopt.version}
+
+
+ log4j
+ log4j
+ ${log4j.version}
+ compile
+
+
+ org.apache.mahout
+ mahout-core
+ ${mahout.version}
+
+
+ org.apache.hadoop
+ hadoop-core
+
+
+
+
+ org.apache.mahout
+ mahout-math
+ ${mahout.version}
+
+
+ it.unimi.dsi
+ fastutil
+ ${fastutil.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ ${maven-compiler-plugin.version}
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+ ${scala.binary.version}
+ ${scala.version}
+
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ default-jar
+ package
+
+ jar
+
+
+ ${project.artifactId}-${project.version}-spark${spark.bin.version}
+
+
+
+
+
+
+ maven-assembly-plugin
+ ${maven-assembly-plugin.version}
+
+ ${project.build.finalName}-spark${spark.bin.version}
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+
+ spark1.6
+
+ 1.6.0
+ 1.6
+
+
+
+ spark
+ 1.6
+
+
+
+
+
+ spark2.0
+
+ 2.0.0
+ 2.0
+
+
+
+ spark
+ 2.0
+
+
+
+
+
+ defaultspark
+
+ 1.6.0
+ 1.6
+
+
+
+ !spark
+
+
+
+
+
+ MR2
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-examples
+ ${hadoop.mr2.version}
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.mr2.version}
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+
+
+
+ !MR1
+
+
+
+
+
diff --git a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/model/user-features b/sparkbench/graph/src/main/resources/nweight-user-features
similarity index 100%
rename from sparkbench/src/main/scala/com/intel/sparkbench/nweight/model/user-features
rename to sparkbench/graph/src/main/resources/nweight-user-features
diff --git a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/Driver.scala b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/Driver.scala
similarity index 75%
rename from sparkbench/src/main/scala/com/intel/sparkbench/nweight/Driver.scala
rename to sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/Driver.scala
index d3c1543f1..30c69db47 100644
--- a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/Driver.scala
+++ b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/Driver.scala
@@ -1,15 +1,26 @@
-package com.intel.sparkbench.nweight
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.hibench.sparkbench.graph.nweight
import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{JobLogger, StatsReportListener}
-import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-
/**
* Compute NWeight for Graph G(V, E) as defined below
* Weight(1)(u, v) = edge(u, v)
diff --git a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/GraphxNWeight.scala b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/GraphxNWeight.scala
similarity index 78%
rename from sparkbench/src/main/scala/com/intel/sparkbench/nweight/GraphxNWeight.scala
rename to sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/GraphxNWeight.scala
index 5c6969431..76c36a077 100644
--- a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/GraphxNWeight.scala
+++ b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/GraphxNWeight.scala
@@ -1,8 +1,24 @@
-package com.intel.sparkbench.nweight
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.hibench.sparkbench.graph.nweight
import scala.collection.JavaConversions._
import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel
@@ -96,4 +112,3 @@ object GraphxNWeight extends Serializable{
}.saveAsTextFile(output)
}
}
-
diff --git a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/datagen/NWeightDataGenerator.scala b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/NWeightDataGenerator.scala
similarity index 80%
rename from sparkbench/src/main/scala/com/intel/sparkbench/nweight/datagen/NWeightDataGenerator.scala
rename to sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/NWeightDataGenerator.scala
index d67f895ad..7ee02446a 100644
--- a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/datagen/NWeightDataGenerator.scala
+++ b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/NWeightDataGenerator.scala
@@ -1,11 +1,26 @@
-package com.intel.sparkbench.nweight.datagen
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.hibench.sparkbench.graph.nweight
import java.io._
-import com.intel.sparkbench.IOCommon
-
+import com.intel.hibench.sparkbench.common.IOCommon
import org.apache.spark.HashPartitioner
-import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
diff --git a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/PregelNWeight.scala b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/PregelNWeight.scala
similarity index 65%
rename from sparkbench/src/main/scala/com/intel/sparkbench/nweight/PregelNWeight.scala
rename to sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/PregelNWeight.scala
index cff2d21fe..9fbf20752 100644
--- a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/PregelNWeight.scala
+++ b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/PregelNWeight.scala
@@ -1,19 +1,39 @@
-package com.intel.sparkbench.nweight
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.hibench.sparkbench.graph.nweight
import scala.collection.JavaConversions._
import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap
-/** * Compute NWeight for Graph G(V, E) as defined below * Weight(1)(u, v) = edge(u, v)
- * Weight(n)(u, v) = Sum (over {x|there are edges (u, x) and (x, v)}) Weight(n-1)(u, x)*Weight(1)(x, v)
+/**
+ * Compute NWeight for Graph G(V, E) as defined below.
+ *
+ * Weight(1)(u, v) = edge(u, v)
+ * Weight(n)(u, v) =
+ * Sum (over {x|there are edges (u, x) and (x, v)}) Weight(n-1)(u, x) * Weight(1)(x, v)
*
- * Input is given in Text file format. Each line represents a Node and all out edges of that node (edge weight specified)
+ * Input is given in Text file format. Each line represents a Node and all out edges of that node
+ * (edge weight specified)
* :, : ...)
*/
@@ -33,7 +53,8 @@ object PregelNWeight extends Serializable{
def mergMsg(c1: Long2DoubleOpenHashMap, c2: Long2DoubleOpenHashMap) = {
c2.long2DoubleEntrySet()
.fastIterator()
- .foreach(pair => c1.put(pair.getLongKey(), c1.get(pair.getLongKey()) + pair.getDoubleValue()))
+ .foreach(pair =>
+ c1.put(pair.getLongKey(), c1.get(pair.getLongKey()) + pair.getDoubleValue()))
c1
}
@@ -70,7 +91,8 @@ object PregelNWeight extends Serializable{
var g = GraphImpl(edges, new SizedPriorityQueue(maxDegree), storageLevel, storageLevel).cache()
- g = Pregel(g, new Long2DoubleOpenHashMap, step, EdgeDirection.In)(vProg _, sendMsg _, mergMsg _)
+ g = Pregel(g, new Long2DoubleOpenHashMap, step, EdgeDirection.In)(
+ vProg _, sendMsg _, mergMsg _)
g.vertices.map { case (vid, vdata) =>
var s = new StringBuilder
diff --git a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/Utils.scala b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/Utils.scala
similarity index 53%
rename from sparkbench/src/main/scala/com/intel/sparkbench/nweight/Utils.scala
rename to sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/Utils.scala
index 6b5dc878b..1a6ffd39a 100644
--- a/sparkbench/src/main/scala/com/intel/sparkbench/nweight/Utils.scala
+++ b/sparkbench/graph/src/main/scala/com/intel/hibench/sparkbench/graph/nweight/Utils.scala
@@ -1,4 +1,21 @@
-package com.intel.sparkbench.nweight
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.hibench.sparkbench.graph.nweight
import it.unimi.dsi.fastutil.objects.ObjectHeaps
@@ -6,7 +23,19 @@ class SizedPriorityQueue(
val capacity:Int) extends Traversable[(Long, Double)] with Serializable {
protected val buf = new Array[(Long, Double)](capacity)
protected val comparator = new java.util.Comparator[(Long, Double)] with Serializable {
- override def compare(m1: (Long, Double), m2: (Long, Double)) = if (m1._2 < m2._2) -1 else (if (m1._2 > m2._2) 1 else (if (m1._1 < m2._1) -1 else (if (m1._1 > m2._1) 1 else 0)))
+ override def compare(m1: (Long, Double), m2: (Long, Double)) : Int = {
+ if (m1._2 < m2._2) {
+ -1
+ } else if (m1._2 > m2._2) {
+ 1
+ } else if (m1._1 < m2._1) {
+ -1
+ } else if (m1._1 > m2._1) {
+ 1
+ } else {
+ 0
+ }
+ }
}
protected var size_ = 0
@@ -18,7 +47,6 @@ class SizedPriorityQueue(
}
def fullySorted(): Array[(Long, Double)] = {
- import scala.collection.JavaConversions._
val slicedBuf = buf.slice(0, size_ - 1)
java.util.Arrays.sort(slicedBuf, comparator)
slicedBuf