Permalink
Browse files

Example for cassandra CQL read/write from spark

  • Loading branch information...
1 parent cda381f commit 19480b7e5934d920dec2f9e06e6c3b1270e2f6e9 @anitatailor anitatailor committed Mar 6, 2014
Showing with 124 additions and 0 deletions.
  1. +124 −0 examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.hadoop.mapreduce.Job
+import org.apache.cassandra.hadoop.ConfigHelper
+import org.apache.cassandra.utils.ByteBufferUtil
+import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
+import org.apache.cassandra.db.IColumn
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
+import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
+import scala.collection.JavaConversions._
+import java.nio.ByteBuffer
+import scala.collection.mutable.ListBuffer
+import scala.collection.immutable.Map
+
+/*
+ Need to create following keyspace and column family in cassandra before running this example
+ Start CQL shell using ./bin/cqlsh and execute following commands
+ CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+ use retail;
+
+ CREATE TABLE salecount (product_id text,sale_count int, PRIMARY KEY (product_id));
+ CREATE TABLE ordercf (user_id text,
+ time timestamp,
+ product_id text,
+ quantity int,
+ PRIMARY KEY (user_id, time));
+ INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
+ INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
+ INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
+ INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
+*/
+
+/*
+ * This example demonstrates how to read and write to cassandra column family created using CQL3
+ * using Spark.
+ * Parameters : <spark_master> <cassandra_node> <cassandra_port>
+ * Usage: ./bin/run-example org.apache.spark.examples.CassandraCQLTest local[2] localhost 9160
+ *
+ */
+object CassandraCQLTest {
+
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0), "CQLTestApp", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
+ val cHost: String = args(1)
+ val cPort: String = args(2)
+ val KeySpace = "retail"
+ val InputColumnFamily = "ordercf"
+ val OutputColumnFamily = "salecount"
+
+ val job = new Job()
+ job.setInputFormatClass(classOf[CqlPagingInputFormat])
+ ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
+ ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
+ ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
+ ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+ CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
+
+ // CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'");
+
+ // An UPDATE writes one or more columns to a record in a Cassandra column family.
+ val query:String = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
+ CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
+
+ job.setOutputFormatClass(classOf[CqlOutputFormat]);
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily);
+ ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost);
+ ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
+ ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
+
+ val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
+ classOf[CqlPagingInputFormat],
+ classOf[java.util.Map[String,ByteBuffer]],
+ classOf[java.util.Map[String,ByteBuffer]])
+
+ println("Count: " + casRdd.count);
+ val productSaleRDD = casRdd.map {
+ case (key, value) => {
+ (ByteBufferUtil.string(value.get("product_id")),ByteBufferUtil.toInt(value.get("quantity")))
+ }
+ }
+ val aggregatedRDD = productSaleRDD.reduceByKey(_+_)
+ aggregatedRDD.collect().foreach {
+ case (productId, saleCount) => println(productId + ":" + saleCount)
+ }
+
+ val casoutputCF = aggregatedRDD.map {
+ case (productId, saleCount) => {
+ val outColFamKey = Map("product_id" ->ByteBufferUtil.bytes(productId))
+ val outKey : java.util.Map[String, ByteBuffer] = outColFamKey
+ var outColFamVal = new ListBuffer[ByteBuffer]
+ outColFamVal += ByteBufferUtil.bytes(saleCount)
+ val outVal : java.util.List[ByteBuffer] = outColFamVal
+ (outKey,outVal)
+ }
+ }
+
+ casoutputCF.saveAsNewAPIHadoopFile(
+ KeySpace,
+ classOf[java.util.Map[String, ByteBuffer]],
+ classOf[java.util.List[ByteBuffer]],
+ classOf[CqlOutputFormat],
+ job.getConfiguration()
+ )
+ }
+}

0 comments on commit 19480b7

Please sign in to comment.