Skip to content

Latest commit

 

History

History
524 lines (399 loc) · 13.5 KB

quick-start.md

File metadata and controls

524 lines (399 loc) · 13.5 KB

Quick Start Guide

This guide will run you through a quick example that uses the Spark-Riak Connector to read and write data using Java, Scala, and Python. We will assume you are running this guide on Mac OSX.

Prerequisites

  • Update brew with brew update.
  • Install Riak TS OSX build. Instruction can be found here
  • Set open file limits for Riak by following the guide here.
  • Install Spark with brew install apache-spark.

Start Riak TS with riak start.

Scroll down or click below to find the desired quick start guide:

Scala

In this quick start guide we will run you through some examples usages of the Spark-Riak Connector using the Spark Scala REPL.

Start Spark Scala REPL:
path/to/spark-shell \
--conf spark.riak.connection.host=127.0.0.1:8087 \
--packages com.basho.riak:spark-riak-connector_{{scala-version}}:{{connector-version}}

Note: you will need to change {{scala-version}} and {{connector-version}} as described in getting the connector

Import the following:

import com.basho.riak.spark._
Create an RDD with some test data and save to Riak KV bucket:
val data = Array(1, 2, 3, 4, 5)
val testRDD = sc.parallelize(data)
testRDD.saveToRiak("kv_bucket_a")

When saving RDD's to Riak that only have values (i.e not key-value pairs), keys will be automatically generated for each value in the RDD.

Check stored data in the Riak KV

The Riak HTTP API in conjunction with curl will be used.

Firstly, since keys were autogenerated, we need to get a list of them:

curl http://127.0.0.1:10018/buckets/kv_bucket_a/keys?keys=true
{"keys":["BJPJNz0Z0BM3hDJl1NzqK8NiCWB","8BFvplqS1yy8GMqRSrFuHHYbpiU","TmftdRwApjIPY9TVbN6LVeGy3XW","ZPHdGrfFmT4MjD8Vkeg31B2EGsK","RINL3QJYSWF4pinoxXV5bQeoJQL"]}bash-3.2$ 

Then, knowing the keys it is possible to fetch corresponding value:

curl -v http://127.0.0.1:10018/buckets/kv_bucket_a/keys/8BFvplqS1yy8GMqRSrFuHHYbpiU
*   Trying 127.0.0.1...
* Connected to 127.0.0.1 (127.0.0.1) port 10018 (#0)
> GET /buckets/kv_bucket_a/keys/8BFvplqS1yy8GMqRSrFuHHYbpiU HTTP/1.1
> Host: 127.0.0.1:10018
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< X-Riak-Vclock: a85hYGBgzGDKBVI8R4M2cps+5+VnYLRozGBKZMxjZfAQS7jIlwUA
< Vary: Accept-Encoding
< Server: MochiWeb/1.1 WebMachine/1.10.8 (that head fake, tho)
< Link: </buckets/kv_bucket_a>; rel="up"
< Last-Modified: Wed, 28 Sep 2016 10:24:08 GMT
< ETag: "79GJcc4X0u5t7SJKwglAyT"
< Date: Wed, 28 Sep 2016 10:36:18 GMT
< Content-Type: application/json
< Content-Length: 1
< 
* Connection #0 to host 127.0.0.1 left intact
5
Query Riak KV bucket and print results:
val queryRDD = sc.riakBucket[String]("kv_bucket_a").queryAll()
queryRDD.collect().foreach(println)

You should see:

scala> queryRDD.collect().foreach(println)
5
4
3
1
2

You will notice that all values (1,2,3,4,5) are printed.

Create a key-value pair RDD and save it to Riak KV bucket:
val data = Array((1,2), (2,4), (3,6), (4,8), (5,10))
val testRDD = sc.parallelize(data)
testRDD.saveToRiak("kv_bucket_b")
Query Riak KV bucket and print results:
val queryRDD = sc.riakBucket[String]("kv_bucket_b").queryAll()
queryRDD.collect().foreach(println)

You should see:

scala> queryRDD.collect().foreach(println)
4
6
10
2
8

You will notice only 2, 4, 6, 8, and 10 are printed. When using tuples, the first value is the key and the second value is the value.

Query Riak KV bucket by keys:
val keyRDD = sc.riakBucket[String]("kv_bucket_b").queryBucketKeys("1", "2", "3")
keyRDD.collect().foreach(println)

You should see:

scala> keyRDD.collect().foreach(println)
4
6
2

You will notice that this prints out (2,4,6).

Query Riak KV bucket including keys and print results:
val queryRDD = sc.riakBucket[(String, String)]("kv_bucket_b").queryAll()
queryRDD.collect().foreach(println)

You should see:

(5,10)
(4,8)
(3,6)
(2,4)
(1,2)
Query Riak KV bucket by secondary indices (2i)

You can query by that too:

val rangeRDD = sc.riakBucket[String]("kv_bucket_b").query2iRange("myIndex", 1L, 5000L)
rangeRDD.collect().foreach(println)

Since none of our test data had a secondary index associated with it, our rangeRDD will be empty.

Create a Spark SQL DataFrame and write it to a Riak TS table.

Make the following imports:

import org.apache.spark.sql.SaveMode
import java.sql.Timestamp
import com.basho.riak.spark.rdd.connector.RiakConnector

Then, set up implicits for Spark sqlContext:

import sqlContext.implicits._
Create an RDD with some timeseries data:
val testRDD = sc.parallelize(Seq(
  (1, "f", Timestamp.valueOf("1980-1-1 10:00:00"), "v1"),
  (1, "f", Timestamp.valueOf("1980-1-1 10:10:00"), "v2"),
  (1, "f", Timestamp.valueOf("1980-1-1 10:20:00"), "v3")))
Convert the RDD to a Spark SQL DataFrame and look at the schema:
val df = testRDD.toDF("k", "family", "ts", "value")
df.printSchema()
Create a TS table with the same data format as the testRDD that we created:
val tableName = "ts_table_c"
val connector = RiakConnector(sc.getConf)

connector.withSessionDo(session =>{
          val request = new com.basho.riak.client.api.commands.timeseries.Query.Builder(
            s"""
              |   CREATE TABLE $tableName  (
              |       k       SINT64    not null,
              |       family  VARCHAR   not null,
              |       ts      TIMESTAMP not null,
              |       value   VARCHAR,
              |
              |       primary key ((k, family, quantum(ts,1,h)), k, family, ts)
              |   )
              |
            """.stripMargin)
            .build()

val response = session.execute(request)})
Write the Spark SQL DataFrame to the newly created Riak TS table:
df.write.format("org.apache.spark.sql.riak").mode(SaveMode.Append).save(tableName)

And, finally, check that the table was successfully written into the Riak TS table by making a simple query and printing the result:

val test_query = "ts >= CAST('1980-1-1 10:00:00' AS TIMESTAMP) AND ts <= CAST('1980-1-1 10:30:00' AS TIMESTAMP) AND k = 1 AND family = 'f'"

val df2 = sqlContext.read.format("org.apache.spark.sql.riak").load(tableName).filter(test_query)

df2.show()
Stop Spark context

Before exiting the Spark Shell, it is better to stop the spark context:

sc.stop()

**Note: If you are using Ctrl+D to exit from the shell, it must be done automatically, whereas for Ctrl+C it is not a case.

Python

In this quick start guide we will run through some examples usages of the Spark-Riak Connector using the Spark Python REPL, pyspark. Please note that Python currently only works with TS tables in Riak TS. We currently do not support Python with KV buckets in Riak KV or Riak TS.

Start pyspark with:
/path/to/bin/pyspark \
--conf spark.riak.connection.host=127.0.0.1:8087 \
--packages com.basho.riak:spark-riak-connector_{{scala-version}}:{{connector-version}}

Note: you will need to change {{scala-version}} and {{connector-version}} as described in getting the connector

Make required imports:
import pyspark
import pyspark_riak
Patch SparkContext instance to enable Riak APIs:
pyspark_riak.riak_context(sc)
Create an RDD with some test data and save to Riak KV bucket:
source_data = [{"key1":{"pr_key":"pr_val1"}}, {"key2":{"pr_key":"pr_val2"}}]
source_rdd = sc.parallelize(source_data, 1)
source_rdd.saveToRiak("test-python-bucket", "default")
Make a full bucket read query:
rdd = sc.riakBucket("test-python-bucket", "default").queryAll()

This will return rdd of key-value pairs. Filter values and print

data = rdd.collect()
values = map(lambda x: x[1], data)
for e in values:
  print e
Query Riak KV bucket by keys:
keyRDD = sc.riakBucket("test-python-bucket", "default").queryBucketKeys("key1", "key2")
data = keyRdd.collect()
values = map(lambda x: x[1], data)
for e in values:
  print e
Query data by 2i secondary indices:
rangeRDD = sc.riakBucket("test-python-bucket", "default").query2iRange("myIndex", 1L, 5000L)
data = rangeRDD.collect()
values = map(lambda x: x[1], data)
for e in values:
  print e

Now let's work with a Riak TS table.

Set up Riak TS connection:
host='127.0.0.1'
pb_port = '8087'
hostAndPort = ":".join([host, pb_port])
client = riak.RiakClient(host=host, pb_port=pb_port)
table_name = 'pyspark-%d' % int(time.time())
table = client.table(table_name)
Create a TS table:
create_sql = """CREATE TABLE %(table_name)s (
site varchar not null,
species varchar not null,
measurementDate timestamp not null,
value double,
PRIMARY KEY ((site, species, quantum(measurementDate, 24, h)),
    site, species, measurementDate))
""" % ({'table_name': table_name})
result = table.query(create_sql)
Print the schema:
schema = table.describe().rows
for r in schema:
    print r

You should see:

['site', 'varchar', False, 1L, 1L]
['species', 'varchar', False, 2L, 2L]
['measurementDate', 'timestamp', False, 3L, 3L]
['value', 'double', True, None, None]
Generate and print some data:
site = 'AA'
species = 'fff'
start_date = int(time.time())
events = []
for i in range(9):
    measurementDate = start_date + i
    value = random.uniform(-20, 110)
    events.append([site, species, measurementDate, value])

end_date = measurementDate

for e in events:
    print e

You should see something like this:

['AA', 'fff', 1460147465, 84.2863373359625]
['AA', 'fff', 1460147466, 22.460677478919976]
['AA', 'fff', 1460147467, 99.44873894866066]
['AA', 'fff', 1460147468, 79.22655985587694]
['AA', 'fff', 1460147469, 20.37795468066598]
['AA', 'fff', 1460147470, 77.30363887094994]
['AA', 'fff', 1460147471, 77.48514266033274]
['AA', 'fff', 1460147472, 78.94730225284083]
['AA', 'fff', 1460147473, 29.09084815136098]
Create an RDD from the generated data:
testRDD = sc.parallelize(events)
Convert RDD to a DataFrame and rename the columns to match the Riak TS table:
df = testRDD.toDF(['site', 'species','measurementDate','value'])
df.show()

You should see something like this:

+----+-------+---------------+------------------+
|site|species|measurementDate|             value|
+----+-------+---------------+------------------+
|  AA|    fff|     1460147465|  84.2863373359625|
|  AA|    fff|     1460147466|22.460677478919976|
|  AA|    fff|     1460147467| 99.44873894866066|
|  AA|    fff|     1460147468| 79.22655985587694|
|  AA|    fff|     1460147469| 20.37795468066598|
|  AA|    fff|     1460147470| 77.30363887094994|
|  AA|    fff|     1460147471| 77.48514266033274|
|  AA|    fff|     1460147472| 78.94730225284083|
|  AA|    fff|     1460147473| 29.09084815136098|
+----+-------+---------------+------------------+
Write the DataFrame to the TS table:
df.write \
    .format('org.apache.spark.sql.riak') \
    .option('spark.riak.connection.host', hostAndPort) \
    .mode('Append') \
    .save(table_name)

Lets check that the write was successful by reading the TS table into a new DataFrame:

sqlContext = SQLContext(sc)
df2 = sqlContext.read\
    .format("org.apache.spark.sql.riak")\
    .option("spark.riak.connection.host", hostAndPort)\
    .option("spark.riakts.bindings.timestamp", "useLong")\
    .load(table_name)\
    .filter("""measurementDate > %(start_date)s
        AND measurementDate <  %(end_date)s
        AND site = '%(site)s'
        AND species = '%(species)s'
    """ % ({'start_date': start_date, 'end_date': end_date, 'site': site, 'species': species}))
Print the table schema:
df2.printSchema()

You should see:

root
 |-- site: string (nullable = false)
 |-- species: string (nullable = false)
 |-- measurementDate: long (nullable = false)
 |-- value: double (nullable = true)
Show the DataFrame:
df2.show()

You should see something like this:

+----+-------+---------------+------------------+
|site|species|measurementDate|             value|
+----+-------+---------------+------------------+
|  AA|    fff|     1460147466|22.460677478919976|
|  AA|    fff|     1460147467| 99.44873894866066|
|  AA|    fff|     1460147468| 79.22655985587694|
|  AA|    fff|     1460147469| 20.37795468066598|
|  AA|    fff|     1460147470| 77.30363887094994|
|  AA|    fff|     1460147471| 77.48514266033274|
|  AA|    fff|     1460147472| 78.94730225284083|
+----+-------+---------------+------------------+
Register the DataFrame as a temp sql table and run a sql query to obtain the average of the "value" column:
df2.registerTempTable("pyspark_tmp")
sqlContext.sql("select avg(value) as average_value from pyspark_tmp").show()

You should see something similar to this:

+-----------------+
|    average_value|
+-----------------+
|65.03571639260672|
+-----------------+

Before exiting the PySpark Shell, you should stop the spark context:

sc.stop()

Java

Coming Soon!