## Spark + Scala + Cassandra examples

Imports! The datastax jars are baked into my Dockerfile.notebook file, this is a slight pain, you need the scala version to match with those on the spark workers

In [1]:
import scala.math.random                                                                   
import org.apache.spark.SparkContext                                                       
import org.apache.spark.SparkContext._                                                     
import org.apache.spark.SparkConf                                                          
import org.apache.spark.sql.SparkSession                                                   
import com.datastax.spark.connector._                                                      
import org.apache.spark.sql.cassandra._ 
import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame

Find the hostname by using 'kubectl get pods -o wide', the pod ip will be the one to select

In [2]:
val hostname = "100.96.2.8"

Setup a sparkcontext

In [3]:
val conf = new SparkConf().setAppName("SparkApp")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("spark.cassandra.connection.host", hostname)
val sc = new SparkContext(conf)

create some key/value pairs and insert them into cassandra if test.kv exists

In [4]:
import java.time.Instant
val ts = Instant.now.getEpochSecond
val collection = sc.parallelize(Seq(("key3", ts, 3), ("key4", ts, 4))) 
collection.saveToCassandra("dev", "measures", SomeColumns("station_id", "time", "value")) 

[Stage 0:>                                                          (0 + 0) / 2]

In [5]:
var rdd = sc.cassandraTable("dev", "measures")           
println(rdd.map(_.getInt("value")).max())  

199                                                                             


In [6]:
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
spark.setCassandraConf("cluster", CassandraConnectorConf.ConnectionHostParam.option(hostname))

org.apache.spark.sql.SparkSession@25d764e9

This displays the table if it exists (max 20)

In [9]:
val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map(                                                                          
       "table" -> "measures",                                                               
       "keyspace" -> "dev",
       "cluster" -> "cluster")).load() 
df.explain()
df.printSchema()

== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@664e3b77 [station_id#8,time#9,value#10] ReadSchema: struct<station_id:string,time:timestamp,value:decimal(38,18)>
root
 |-- station_id: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- value: decimal(38,18) (nullable = true)



In [10]:
val createDDL = """CREATE TEMPORARY VIEW words
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "measures",
     keyspace "dev",
     cluster "cluster",
     pushdown "true")"""
spark.sql(createDDL)

[]

In [32]:
spark.sql("SELECT * from words").show()
spark.sql("SELECT station_id, value from words where station_id='station2'").show()

+----------+-------------------+--------------------+
|station_id|               time|               value|
+----------+-------------------+--------------------+
|  station1|1960-02-16 09:00:00|81.10724639892578...|
|  station1|1960-02-16 10:00:00|190.1847534179687...|
|  station1|1960-02-16 11:00:00|97.56996917724609...|
|  station1|1960-02-16 12:00:00|124.5751495361328...|
|  station1|1960-02-16 13:00:00|53.32531738281250...|
|  station1|1960-02-16 14:00:00|168.5605316162109...|
|  station1|1960-02-16 15:00:00|146.4290618896484...|
|  station1|1960-02-16 16:00:00|197.0472869873046...|
|  station1|1960-02-16 17:00:00|160.8878173828125...|
|  station1|1960-02-16 18:00:00|136.0335540771484...|
|  station1|1960-02-16 19:00:00|50.38066101074218...|
|  station1|1960-02-16 20:00:00|11.29835891723632...|
|  station1|1960-02-16 21:00:00|23.29217147827148...|
|  station1|1960-02-16 22:00:00|66.18018341064453...|
|  station1|1960-02-16 23:00:00|136.6040191650390...|
|  station1|1960-02-17 00:00

In [10]:
sc.stop()
spark.stop()