Skip to content

Commit

Permalink
Fixed scala style as per review
Browse files Browse the repository at this point in the history
  • Loading branch information
anitatailor committed Mar 6, 2014
1 parent 19480b7 commit 3493f81
Showing 1 changed file with 48 additions and 35 deletions.
Expand Up @@ -17,39 +17,49 @@

package org.apache.spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapreduce.Job
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.Map
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.db.IColumn
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
import scala.collection.JavaConversions._
import java.nio.ByteBuffer
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.Map
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/*
Need to create following keyspace and column family in cassandra before running this example
Start CQL shell using ./bin/cqlsh and execute following commands
CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
use retail;
CREATE TABLE salecount (product_id text,sale_count int, PRIMARY KEY (product_id));
CREATE TABLE salecount (prod_id text, sale_count int, PRIMARY KEY (prod_id));
CREATE TABLE ordercf (user_id text,
time timestamp,
product_id text,
prod_id text,
quantity int,
PRIMARY KEY (user_id, time));
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
INSERT INTO ordercf (user_id,
time,
prod_id,
quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
INSERT INTO ordercf (user_id,
time,
prod_id,
quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
INSERT INTO ordercf (user_id,
time,
prod_id,
quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
INSERT INTO ordercf (user_id,
time,
prod_id,
quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
*/

/*
/**
* This example demonstrates how to read and write to cassandra column family created using CQL3
* using Spark.
* Parameters : <spark_master> <cassandra_node> <cassandra_port>
Expand All @@ -59,7 +69,10 @@ import scala.collection.immutable.Map
object CassandraCQLTest {

def main(args: Array[String]) {
val sc = new SparkContext(args(0), "CQLTestApp", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val sc = new SparkContext(args(0),
"CQLTestApp",
System.getenv("SPARK_HOME"),
SparkContext.jarOfClass(this.getClass))
val cHost: String = args(1)
val cPort: String = args(2)
val KeySpace = "retail"
Expand All @@ -72,44 +85,44 @@ object CassandraCQLTest {
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")

// CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'");
/** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */

// An UPDATE writes one or more columns to a record in a Cassandra column family.
val query:String = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
/** An UPDATE writes one or more columns to a record in a Cassandra column family */
val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
CqlConfigHelper.setOutputCql(job.getConfiguration(), query)

job.setOutputFormatClass(classOf[CqlOutputFormat]);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost);
job.setOutputFormatClass(classOf[CqlOutputFormat])
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily)
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
classOf[java.util.Map[String,ByteBuffer]],
classOf[java.util.Map[String,ByteBuffer]])

println("Count: " + casRdd.count);
println("Count: " + casRdd.count)
val productSaleRDD = casRdd.map {
case (key, value) => {
(ByteBufferUtil.string(value.get("product_id")),ByteBufferUtil.toInt(value.get("quantity")))
(ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity")))
}
}
val aggregatedRDD = productSaleRDD.reduceByKey(_+_)
val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
aggregatedRDD.collect().foreach {
case (productId, saleCount) => println(productId + ":" + saleCount)
}

val casoutputCF = aggregatedRDD.map {
case (productId, saleCount) => {
val outColFamKey = Map("product_id" ->ByteBufferUtil.bytes(productId))
val outKey : java.util.Map[String, ByteBuffer] = outColFamKey
var outColFamVal = new ListBuffer[ByteBuffer]
val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
var outColFamVal = new ListBuffer[ByteBuffer]
outColFamVal += ByteBufferUtil.bytes(saleCount)
val outVal : java.util.List[ByteBuffer] = outColFamVal
(outKey,outVal)
val outVal: java.util.List[ByteBuffer] = outColFamVal
(outKey, outVal)
}
}

Expand Down

0 comments on commit 3493f81

Please sign in to comment.