Permalink
Browse files

Fixed scala style as per review

  • Loading branch information...
anitatailor committed Mar 6, 2014
1 parent 19480b7 commit 3493f81e81b783afa35cd996219bfe36db383c1b
Showing with 48 additions and 35 deletions.
  1. +48 −35 examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -17,39 +17,49 @@
package org.apache.spark.examples
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.hadoop.mapreduce.Job
+import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+import scala.collection.immutable.Map
import org.apache.cassandra.hadoop.ConfigHelper
-import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
-import org.apache.cassandra.db.IColumn
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
-import scala.collection.JavaConversions._
-import java.nio.ByteBuffer
-import scala.collection.mutable.ListBuffer
-import scala.collection.immutable.Map
+import org.apache.cassandra.utils.ByteBufferUtil
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
/*
Need to create following keyspace and column family in cassandra before running this example
Start CQL shell using ./bin/cqlsh and execute following commands
CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
use retail;
-
- CREATE TABLE salecount (product_id text,sale_count int, PRIMARY KEY (product_id));
+ CREATE TABLE salecount (prod_id text, sale_count int, PRIMARY KEY (prod_id));
CREATE TABLE ordercf (user_id text,
time timestamp,
- product_id text,
+ prod_id text,
quantity int,
PRIMARY KEY (user_id, time));
- INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
- INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
- INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
- INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
+ INSERT INTO ordercf (user_id,
+ time,
+ prod_id,
+ quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
+ INSERT INTO ordercf (user_id,
+ time,
+ prod_id,
+ quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
+ INSERT INTO ordercf (user_id,
+ time,
+ prod_id,
+ quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
+ INSERT INTO ordercf (user_id,
+ time,
+ prod_id,
+ quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
*/
-/*
+/**
* This example demonstrates how to read and write to cassandra column family created using CQL3
* using Spark.
* Parameters : <spark_master> <cassandra_node> <cassandra_port>
@@ -59,7 +69,10 @@ import scala.collection.immutable.Map
object CassandraCQLTest {
def main(args: Array[String]) {
- val sc = new SparkContext(args(0), "CQLTestApp", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
+ val sc = new SparkContext(args(0),
+ "CQLTestApp",
+ System.getenv("SPARK_HOME"),
+ SparkContext.jarOfClass(this.getClass))
val cHost: String = args(1)
val cPort: String = args(2)
val KeySpace = "retail"
@@ -72,44 +85,44 @@ object CassandraCQLTest {
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
- CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
+ CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")
- // CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'");
+ /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */
- // An UPDATE writes one or more columns to a record in a Cassandra column family.
- val query:String = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
- CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
+ /** An UPDATE writes one or more columns to a record in a Cassandra column family */
+ val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
+ CqlConfigHelper.setOutputCql(job.getConfiguration(), query)
- job.setOutputFormatClass(classOf[CqlOutputFormat]);
- ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily);
- ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost);
+ job.setOutputFormatClass(classOf[CqlOutputFormat])
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily)
+ ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
- ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
+ ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
classOf[java.util.Map[String,ByteBuffer]],
classOf[java.util.Map[String,ByteBuffer]])
- println("Count: " + casRdd.count);
+ println("Count: " + casRdd.count)
val productSaleRDD = casRdd.map {
case (key, value) => {
- (ByteBufferUtil.string(value.get("product_id")),ByteBufferUtil.toInt(value.get("quantity")))
+ (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity")))
}
}
- val aggregatedRDD = productSaleRDD.reduceByKey(_+_)
+ val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
aggregatedRDD.collect().foreach {
case (productId, saleCount) => println(productId + ":" + saleCount)
}
val casoutputCF = aggregatedRDD.map {
case (productId, saleCount) => {
- val outColFamKey = Map("product_id" ->ByteBufferUtil.bytes(productId))
- val outKey : java.util.Map[String, ByteBuffer] = outColFamKey
- var outColFamVal = new ListBuffer[ByteBuffer]
+ val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
+ val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
+ var outColFamVal = new ListBuffer[ByteBuffer]
outColFamVal += ByteBufferUtil.bytes(saleCount)
- val outVal : java.util.List[ByteBuffer] = outColFamVal
- (outKey,outVal)
+ val outVal: java.util.List[ByteBuffer] = outColFamVal
+ (outKey, outVal)
}
}

0 comments on commit 3493f81

Please sign in to comment.