# Aerospike Spark Connector Tutorial for Scala

## Tested with Spark connector 2.8.0, Java 8, Apache Spark 2.4.0, Python 3.7  and Scala 2.11.12 and Spylon ( https://pypi.org/project/spylon-kernel/)


In [1]:
%%init_spark 
launcher.jars = ["aerospike-spark-assembly-2.8.0.jar"] 
launcher.master = "local[*]"

In [2]:
//Specify the Seed Host of the Aerospike Server
val AS_HOST = "172.16.39.169:3000"

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.2:4040
SparkContext available as 'sc' (version = 2.4.0, master = local[*], app id = local-1620415074373)
SparkSession available as 'spark'


AS_HOST: String = 172.16.39.169:3000


In [3]:
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode


## Schema in the Spark Connector

-  Aerospike is schemaless, however spark adher to schema. After the schema is decided upon (either through inference or given), data within the bins must honor the types. 

- To infer the schema, the connector samples a set of records (configurable through `aerospike.schema.scan`) to decide the name of bins/columns and their types. This implies that the derived schema depends entirely upon sampled records.  

- **Note that `__key` was not part of provided schema. So how can one query using `__key`? We can just add `__key` in provided schema with appropriate type. Similarly we can add `__gen` or `__ttl` etc.**  
         
      val schemaWithPK: StructType = new StructType(Array(
                StructField("__key",IntegerType, nullable = false),    
                StructField("id", IntegerType, nullable = false),
                StructField("name", StringType, nullable = false),
                StructField("age", IntegerType, nullable = false),
                StructField("salary",IntegerType, nullable = false)))
                
- **We recommend that you provide schema for queries that involve complex data types such as lists, maps, and mixed types. Using schema inference for CDT may cause unexpected issues.** 
          

### Flexible schema inference 

Spark assumes that the underlying data store (Aerospike in this case) follows a strict schema for all the records within a table. However, Aerospike is a No-SQL DB and is schemaless. Hence a single bin (mapped to a column ) within a set ( mapped to a table ) could technically hold values of multiple Aerospike supported types. The Spark connector reconciles this incompatibility with help of certain rules. Please choose the configuration that suits your use case. The strict configuration (aerospike.schema.flexible = false ) could be used when you have modeled your data in Aerospike to adhere to a strict schema i.e. each record within the set has the same schema.


#### aerospike.schema.flexible = true (default) 
   
  If none of the column types in the user-specified schema match the bin types of a record in Aerospike, a record with NULLs is returned in the result set. 

Please use the filter() in Spark to filter out NULL records. For e.g. df.filter("gender == NULL").show(false), where df is a dataframe and gender is a field that was not specified in the user-specified schema. 

If the above mismatch is limited to fewer columns in the user-specified schema then NULL would be returned for those columns in the result set. **Note: there is no way to tell apart a NULL due to missing value in the original data set and the NULL due to mismatch, at this point. Hence, the user would have to treat all NULLs as missing values.** The columns that are not a part of the schema will be automatically filtered out in the result set by the connector.

Please note that if any field is set to NOT nullable i.e. nullable = false, your query will error out if there’s a type mismatch between an Aerospike bin and the column type specified in the user-specified schema.
  

### Create sample data to demonstrate flexible schema inference

In [4]:
import com.aerospike.client.policy.WritePolicy
import com.aerospike.spark.sql.AerospikeConnection
import org.apache.spark.sql.SparkSession
import com.aerospike.client.{AerospikeClient, AerospikeException, Bin, Key}

val conf = sc.getConf.clone();

conf.set("aerospike.seedhost" , AS_HOST)
conf.set("aerospike.schema.flexible" , "true") //by default it is always true

val client = AerospikeConnection.getClient(conf)
val flexsetname = "flexschema"
val wp = new WritePolicy()
    wp.expiration = 6000 // expire data in 10 minutes
    for (i <- 1 to 100) {
      val key = new Key("test", flexsetname, i)
      client.delete(null, key )
      if( i %2 ==0){
        client.put(wp, key, new Bin("one", i.toInt), new Bin("two", i.toInt))
      }else{
        client.put(wp, key, new Bin("one", i.toInt), new Bin("two", i.toString))
      }
    }


conf.set("aerospike.keyPath", "/etc/aerospike/features.conf")
conf.set("aerospike.namespace", "test")
spark.close()

val spark2= SparkSession.builder().config(conf).master("local[2]").getOrCreate()
val flexibleSchema= StructType (
      Seq(
        StructField("one", IntegerType, true ),
        StructField("two", IntegerType, true )
      )
    )

spark2.read.format("aerospike").schema(flexibleSchema).option("aerospike.set", flexsetname).load().show()

//Please note that, in case of type mismatch all columns with odd value of `one`(which had string type) is set to null

+---+----+
|one| two|
+---+----+
| 82|  82|
| 67|null|
| 29|null|
| 39|null|
| 16|  16|
| 34|  34|
|  1|null|
| 77|null|
| 52|  52|
| 27|null|
| 25|null|
| 11|null|
| 15|null|
| 96|  96|
| 97|null|
|  4|   4|
| 89|null|
| 14|  14|
| 79|null|
| 71|null|
+---+----+
only showing top 20 rows



import com.aerospike.client.policy.WritePolicy
import com.aerospike.spark.sql.AerospikeConnection
import org.apache.spark.sql.SparkSession
import com.aerospike.client.{AerospikeClient, AerospikeException, Bin, Key}
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2d5fbbac
client: com.aerospike.client.AerospikeClient = com.aerospike.client.AerospikeClient@36f19faf
flexsetname: String = flexschema
wp: com.aerospike.client.policy.WritePolicy = com.aerospike.client.policy.WritePolicy@6a18a2f1
wp.expiration: Int = 6000
spark2: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2876f127
flexibleSchema: org.apache.spark.sql.types.StructType = StructType(StructField(one,IntegerType,true), StructField(two,IntegerType,true))


#### aerospike.schema.flexible = false 

If a mismatch between the user-specified schema and the schema of a record in Aerospike is detected at the bin/column level, your query will error out.


In [5]:
//When strict matching is set, we will get an exception due to type mismatch with schema provided.
import scala.util.Try

val df = Try{
    spark2.sqlContext.read.
    format("aerospike").
    schema(flexibleSchema).
    option("aerospike.schema.flexible", "false").
    option("aerospike.set", flexsetname).
    load().show()

}            

2021-05-07 12:18:01 ERROR Executor:91 - Exception in task 0.0 in stage 4.0 (TID 49)
com.aerospike.spark.sql.TypeConverter$TypeMismatchException
	at com.aerospike.spark.sql.TypeConverter$.convertToSparkType(TypeConverter.scala:243)
	at com.aerospike.spark.sql.TypeConverter$.binToValue(TypeConverter.scala:314)
	at com.aerospike.spark.sql.RowIterator$$anonfun$23.apply(KeyRecordRDD.scala:638)
	at com.aerospike.spark.sql.RowIterator$$anonfun$23.apply(KeyRecordRDD.scala:627)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at com.aerospike.spark.sql.RowIterator.get(KeyRecordRDD.

import scala.util.Try
df: scala.util.Try[Unit] =
Failure(org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 49, localhost, executor driver): com.aerospike.spark.sql.TypeConverter$TypeMismatchException
	at com.aerospike.spark.sql.TypeConverter$.convertToSparkType(TypeConverter.scala:243)
	at com.aerospike.spark.sql.TypeConverter$.binToValue(TypeConverter.scala:314)
	at com.aerospike.spark.sql.RowIterator$$anonfun$23.apply(KeyRecordRDD.scala:638)
	at com.aerospike.spark.sql.RowIterator$$anonfun$23.apply(KeyRecordRDD.scala:627)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc...

## Create sample data and write it into Aerospike Database

In [6]:
//Create test data

val num_records=1000
val rand = scala.util.Random

//schema of input data
// val spark = SparkSession.builder().config(strictConf).master("local[*]").getOrCreate()
val schema: StructType = new StructType(
    Array(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false),
    StructField("salary",IntegerType, nullable = false)
  ))

val inputDF = {
    val inputBuf=  new ArrayBuffer[Row]()
    for ( i <- 1 to num_records){
        val name = "name"  + i
        val age = i%100
        val salary = 50000 + rand.nextInt(50000)
        val id = i 
        val r = Row(id, name, age,salary)
        inputBuf.append(r)
    }
    val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq)
    spark2.createDataFrame(inputRDD,schema)
}

inputDF.show(10)

//Write the Sample Data to Aerospike
inputDF.write.mode(SaveMode.Overwrite) 
.format("aerospike") //aerospike specific format
.option("aerospike.writeset", "scala_input_data") //write to this set
.option("aerospike.updateByKey", "id") //indicates which columns should be used for construction of primary key
.option("aerospike.sendKey", "true")
.save()

+---+------+---+------+
| id|  name|age|salary|
+---+------+---+------+
|  1| name1|  1| 63874|
|  2| name2|  2| 80652|
|  3| name3|  3| 89869|
|  4| name4|  4| 90393|
|  5| name5|  5| 54456|
|  6| name6|  6| 97832|
|  7| name7|  7| 56316|
|  8| name8|  8| 87378|
|  9| name9|  9| 99287|
| 10|name10| 10| 80446|
+---+------+---+------+
only showing top 10 rows



num_records: Int = 1000
rand: util.Random.type = scala.util.Random$@656ebfcb
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,false), StructField(age,IntegerType,false), StructField(salary,IntegerType,false))
inputDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]


### Insert data using sql insert staements

In [7]:
/*
Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column 
using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d 
like to be the Primary key, while loading data from the DB.
*/
val insertDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.schema(schema)
.option("aerospike.updateByKey", "id") //required for sql inserts 
.option("aerospike.set", "scala_input_data")
.load()

val sqlView="inserttable"
insertDFWithSchema.createOrReplaceTempView(sqlView)
//
//V2 datasource doesn't allow insert into a view. 
//

// spark.sql(s"insert into $sqlView  values (20000, 'insert_record1', 200, 23000), (20001, 'insert_record2', 201, 23001)")

// spark
// .sqlContext
// .read
// .format("aerospike")
// .schema(schema)
// .option("aerospike.seedhost",AS_HOST)
// .option("aerospike.featurekey", "/etc/aerospike/features.conf") 
// .option ("aerospike.namespace", "test")
// .option("aerospike.set", "input_data").load.where("id >2000").show()
spark2.sql(s"select * from $sqlView").show()

+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|132|name132| 32| 96194|
|647|name647| 47| 81584|
| 45| name45| 45| 52189|
|558|name558| 58| 89307|
|608|name608|  8| 70123|
|687|name687| 87| 57574|
|372|name372| 72| 52430|
|335|name335| 35| 70880|
|911|name911| 11| 89581|
|352|name352| 52| 77206|
| 94| name94| 94| 58512|
|890|name890| 90| 85952|
|334|name334| 34| 59674|
|907|name907|  7| 79228|
|148|name148| 48| 97284|
|315|name315| 15| 72954|
|163|name163| 63| 77421|
|882|name882| 82| 57553|
|602|name602|  2| 76237|
|673|name673| 73| 50336|
+---+-------+---+------+
only showing top 20 rows



insertDFWithSchema: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
sqlView: String = inserttable


## Load data into a DataFrame without specifying any schema i.e. using connector schema inference

In [8]:
// Create a Spark DataFrame by using the Connector Schema inference mechanism

val loadedDFWithoutSchema=spark2
.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data") //read the data from this set
.load
loadedDFWithoutSchema.printSchema()
//Notice that schema of loaded data has some additional fields. 
// When connector infers schema, it also adds internal metadata.

root
 |-- __key: string (nullable = true)
 |-- __digest: binary (nullable = true)
 |-- __expiry: integer (nullable = false)
 |-- __generation: integer (nullable = false)
 |-- __ttl: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- salary: long (nullable = true)
 |-- id: long (nullable = true)



loadedDFWithoutSchema: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields]


## Load data into a DataFrame with user specified schema 

In [9]:
//Data can be loaded with known schema as well.
val loadedDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.schema(schema)
.option("aerospike.set", "scala_input_data").load
loadedDFWithSchema.show(5)

+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|132|name132| 32| 96194|
|647|name647| 47| 81584|
| 45| name45| 45| 52189|
|558|name558| 58| 89307|
|608|name608|  8| 70123|
+---+-------+---+------+
only showing top 5 rows



loadedDFWithSchema: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]


## Writing Sample Collection Data Types (CDT) data into Aerospike

In [10]:
val complex_data_json="resources/nested_data.json"
val alias=  StructType(List(
    StructField("first_name",StringType, false),
    StructField("last_name",StringType, false)))

  val name= StructType(List(
    StructField("first_name",StringType, false),
    StructField("aliases",ArrayType(alias), false )
  ))

  val street_adress= StructType(List(
    StructField("street_name", StringType, false),
    StructField("apt_number" , IntegerType, false)))

  val address = StructType( List(
    StructField ("zip" , LongType, false),
    StructField("street", street_adress, false),
    StructField("city", StringType, false)))

  val workHistory = StructType(List(
    StructField ("company_name" , StringType, false),
    StructField( "company_address" , address, false),
    StructField("worked_from", StringType, false)))

  val person=  StructType ( List(
    StructField("name" , name, false, Metadata.empty),
    StructField("SSN", StringType, false,Metadata.empty),
    StructField("home_address", ArrayType(address), false),
    StructField("work_history", ArrayType(workHistory), false)))

val cmplx_data_with_schema=spark2.read.schema(person).json(complex_data_json)

cmplx_data_with_schema.printSchema()
cmplx_data_with_schema.write.mode(SaveMode.Overwrite) 
.format("aerospike") //aerospike specific format
.option("aerospike.seedhost", AS_HOST) //db hostname, can be added multiple hosts, delimited with ":"
.option("aerospike.namespace", "test") //use this namespace 
.option("aerospike.writeset", "scala_complex_input_data") //write to this set
.option("aerospike.updateByKey", "name.first_name") //indicates which columns should be used for construction of primary key
.save()

root
 |-- name: struct (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- aliases: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- first_name: string (nullable = true)
 |    |    |    |-- last_name: string (nullable = true)
 |-- SSN: string (nullable = true)
 |-- home_address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- zip: long (nullable = true)
 |    |    |-- street: struct (nullable = true)
 |    |    |    |-- street_name: string (nullable = true)
 |    |    |    |-- apt_number: integer (nullable = true)
 |    |    |-- city: string (nullable = true)
 |-- work_history: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- company_name: string (nullable = true)
 |    |    |-- company_address: struct (nullable = true)
 |    |    |    |-- zip: long (nullable = true)
 |    |    |    |-- street: struct (nullable = true)
 |    |    |    |    

complex_data_json: String = resources/nested_data.json
alias: org.apache.spark.sql.types.StructType = StructType(StructField(first_name,StringType,false), StructField(last_name,StringType,false))
name: org.apache.spark.sql.types.StructType = StructType(StructField(first_name,StringType,false), StructField(aliases,ArrayType(StructType(StructField(first_name,StringType,false), StructField(last_name,StringType,false)),true),false))
street_adress: org.apache.spark.sql.types.StructType = StructType(StructField(street_name,StringType,false), StructField(apt_number,IntegerType,false))
address: org.apache.spark.sql.types.StructType = StructType(StructField(zip,LongType,false), StructField(street,StructType(StructField(street_name,StringType,false), StructField(apt_number,IntegerType,false)),fal...

## Load Complex Data Types (CDT) into a DataFrame with user specified schema

In [11]:
val loadedComplexDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_complex_input_data") //read the data from this set
.schema(person)
.load
loadedComplexDFWithSchema.printSchema()
//Please note the difference in types of loaded data in both cases. With schema, we extactly infer complex types.

root
 |-- name: struct (nullable = false)
 |    |-- first_name: string (nullable = false)
 |    |-- aliases: array (nullable = false)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- first_name: string (nullable = false)
 |    |    |    |-- last_name: string (nullable = false)
 |-- SSN: string (nullable = false)
 |-- home_address: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- zip: long (nullable = false)
 |    |    |-- street: struct (nullable = false)
 |    |    |    |-- street_name: string (nullable = false)
 |    |    |    |-- apt_number: integer (nullable = false)
 |    |    |-- city: string (nullable = false)
 |-- work_history: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- company_name: string (nullable = false)
 |    |    |-- company_address: struct (nullable = false)
 |    |    |    |-- zip: long (nullable = false)
 |    |    |    |-- street: struct (nullable = false)
 |  

loadedComplexDFWithSchema: org.apache.spark.sql.DataFrame = [name: struct<first_name: string, aliases: array<struct<first_name:string,last_name:string>>>, SSN: string ... 2 more fields]


# Quering Aerospike Data using SparkSQL

### Things to keep in mind
   1. Queries that involve Primary Key or Digest in the predicate trigger aerospike_batch_get()( https://www.aerospike.com/docs/client/c/usage/kvs/batch.html) and run extremely fast. For e.g. a query containing `__key` or `__digest` with, with no `OR` between two bins.
   2. All other queries may entail a full scan of the Aerospike DB if they can’t be converted to Aerospike batchget. 

## Queries that include Primary Key in the Predicate

In case of batchget queries we can also apply filters upon metadata columns like `__gen` or `__ttl` etc. To do so, these columns should be exposed through schema (if schema provided). 

In [12]:
val batchGet1= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when schema is not provided.
.load.where("__key = 829")
batchGet1.show()
//Please be aware Aerospike database supports only equality test with PKs in primary key query. 
//So, a where clause with "__key >10", would result in scan query!

+-----+--------------------+--------+------------+-----+-------+---+------+---+
|__key|            __digest|__expiry|__generation|__ttl|   name|age|salary| id|
+-----+--------------------+--------+------------+-----+-------+---+------+---+
|  829|[C0 B6 C4 DE 68 D...|       0|           1|   -1|name829| 29| 79521|829|
+-----+--------------------+--------+------------+-----+-------+---+------+---+



batchGet1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]


In [13]:
//In this query we are doing *OR* between PK subqueries 

val somePrimaryKeys= 1.to(10).toSeq
val someMoreKeys= 12.to(14).toSeq
val batchGet2= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema.
.load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("__key") isin (someMoreKeys:_*) ))
batchGet2.show(5)
//We should got in total 13 records.

+-----+--------------------+--------+------------+-----+-----+---+------+---+
|__key|            __digest|__expiry|__generation|__ttl| name|age|salary| id|
+-----+--------------------+--------+------------+-----+-----+---+------+---+
|    1|[89 31 AB FE 54 D...|       0|           1|   -1|name1|  1| 63874|  1|
|    4|[93 F1 65 F0 E8 9...|       0|           1|   -1|name4|  4| 90393|  4|
|    3|[D4 A1 0B A5 12 0...|       0|           1|   -1|name3|  3| 89869|  3|
|    7|[30 94 D4 E7 9E 8...|       0|           1|   -1|name7|  7| 56316|  7|
|    5|[3E F5 94 A9 3A A...|       0|           1|   -1|name5|  5| 54456|  5|
+-----+--------------------+--------+------------+-----+-----+---+------+---+
only showing top 5 rows



somePrimaryKeys: scala.collection.immutable.Range = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
someMoreKeys: scala.collection.immutable.Range = Range(12, 13, 14)
batchGet2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]


## Queries that do not include Primary Key in the Predicate

In [14]:

val somePrimaryKeys= 1.to(10).toSeq
val scanQuery1= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema.
.load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("age") >50 ))

scanQuery1.show()

//Since there is OR between PKs and Bin. It will be treated as Scan query. 
//Primary keys are not stored in bins(by default), hence only filters corresponding to bins are honored.  

+-----+--------------------+--------+------------+-----+-------+---+------+---+
|__key|            __digest|__expiry|__generation|__ttl|   name|age|salary| id|
+-----+--------------------+--------+------------+-----+-------+---+------+---+
|  558|[14 80 A2 9D D2 E...|       0|           1|   -1|name558| 58| 89307|558|
|  687|[1A 30 21 88 39 A...|       0|           1|   -1|name687| 87| 57574|687|
|  372|[1B 40 51 DD 64 F...|       0|           1|   -1|name372| 72| 52430|372|
|  352|[23 A0 99 06 1F 7...|       0|           1|   -1|name352| 52| 77206|352|
|   94|[26 E0 C4 85 CE 9...|       0|           1|   -1| name94| 94| 58512| 94|
|  890|[26 30 F7 1A D3 A...|       0|           1|   -1|name890| 90| 85952|890|
|  163|[3E D0 72 42 15 9...|       0|           1|   -1|name163| 63| 77421|163|
|  882|[3E C0 28 CE F2 5...|       0|           1|   -1|name882| 82| 57553|882|
|  673|[45 10 C1 D6 80 3...|       0|           1|   -1|name673| 73| 50336|673|
|  991|[47 A0 D4 EC 12 1...|       0|   

somePrimaryKeys: scala.collection.immutable.Range = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scanQuery1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]


### batchget query using `__digest`
   - `__digest` can have only two types `BinaryType`(default type) or `StringType`.
   - If schema is not provided and `__digest` is `StringType`, then set `aerospike.digestType` to `string`.
   - Records retrieved with `__digest` batchget call will have null primary key (i.e.`__key` is `null`).

In [15]:
import com.aerospike.client.Key
import com.aerospike.spark.utility.HelperFunctions._
val v: Int = 829

def ByteArray2Hex(bytes: Array[Byte]): String = {
    bytes.map("%02x".format(_)).mkString
}

//using schema inference 
val binaryDigest= new Key("test","scala_input_data",v).digest
val stringDigest= ByteArray2Hex(binaryDigest) //convert Array[Byte] digest to hexstring

val batchGetdigest= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.load.filter($"__digest"=== binaryDigest )
batchGetdigest.show() //note __key is null in retrieved record

val batchGetStringdigest= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.digestType", "string") //specify digestType if using schema inference 
.load.filter($"__digest"=== stringDigest )
batchGetStringdigest.show()

//using user provided schema, with BinaryType digest
//Note that the retrieved records __key field is null. 
val binaryDigestSchema: StructType = new StructType(
    Array(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false),
    StructField("__digest",BinaryType, nullable = false)  //note to query using digest, schema should have this field
  ))
val dfBinaryDigest=spark2.sqlContext
.read
.format("aerospike")
.schema(binaryDigestSchema)
.option("aerospike.set", "scala_input_data")
.load.filter($"__digest"=== binaryDigest )
dfBinaryDigest.show()

//using user provided schema, with hex string digest
val stringDigestSchema: StructType = new StructType(
    Array(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false),
    StructField("__digest",StringType, nullable = false)  //note the type of digest here!
  ))
val dfStringDigest=spark2.sqlContext
.read
.format("aerospike")
.schema(stringDigestSchema)
.option("aerospike.set", "scala_input_data")
.load.filter($"__digest"=== stringDigest )
dfStringDigest.show()


//Please note that in scan call, if data was written using sendKey=true, then __key value will retrieved.
val dfBinaryDigestWithScan=spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.load.filter($"__digest"=== binaryDigest || $"id" === 829 ) // 
dfBinaryDigestWithScan.show()

+-----+--------------------+--------+------------+-----+-------+---+------+---+
|__key|            __digest|__expiry|__generation|__ttl|   name|age|salary| id|
+-----+--------------------+--------+------------+-----+-------+---+------+---+
| null|[C0 B6 C4 DE 68 D...|       0|           1|   -1|name829| 29| 79521|829|
+-----+--------------------+--------+------------+-----+-------+---+------+---+

+-----+--------------------+--------+------------+-----+-------+---+------+---+
|__key|            __digest|__expiry|__generation|__ttl|   name|age|salary| id|
+-----+--------------------+--------+------------+-----+-------+---+------+---+
| null|c0b6c4de68d77d7b9...|       0|           1|   -1|name829| 29| 79521|829|
+-----+--------------------+--------+------------+-----+-------+---+------+---+

+---+-------+---+--------------------+
| id|   name|age|            __digest|
+---+-------+---+--------------------+
|829|name829| 29|[C0 B6 C4 DE 68 D...|
+---+-------+---+--------------------+

+-

import com.aerospike.client.Key
import com.aerospike.spark.utility.HelperFunctions._
v: Int = 829
ByteArray2Hex: (bytes: Array[Byte])String
binaryDigest: Array[Byte] = Array(-64, -74, -60, -34, 104, -41, 125, 123, -99, -102, 85, 105, 107, -58, 71, 15, -85, -124, -21, 98)
stringDigest: String = c0b6c4de68d77d7b9d9a55696bc6470fab84eb62
batchGetdigest: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: string, __digest: binary ... 7 more fields]
batchGetStringdigest: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: string, __digest: string ... 7 more fields]
binaryDigestSchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,false), StructField(age,IntegerType,false), StructField(__digest,Binary...

## Query with CDT

In [16]:
//Find all people who have atleast 5 jobs in past.
loadedComplexDFWithSchema
.withColumn("past_jobs", col("work_history.company_name"))
.withColumn("num_jobs", size(col("past_jobs")))
.where(col("num_jobs")  >4).show()

+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|                name|        SSN|        home_address|        work_history|           past_jobs|num_jobs|
+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|[Jamie, [[Patrici...|569-31-4715|[[53379, [James I...|[[Brown, Miller a...|[Brown, Miller an...|       5|
|[Michael, [[Micha...|455-56-8642|[[2300, [Bauer Ov...|[[Harrington, All...|[Harrington, Alle...|       5|
|[Luis, [[David, G...|818-16-1742|[[60659, [Oneill ...|[[Moss-Johnson, [...|[Moss-Johnson, St...|       5|
|[Tami, [[Joseph, ...|001-49-0685|[[23288, [Clark V...|[[Roberts PLC, [4...|[Roberts PLC, Hub...|       5|
|[Krista, [[Robert...|756-24-3462|[[64750, [Thomas ...|[[Baker PLC, [468...|[Baker PLC, Kirk ...|       5|
|[Kristina, [[Vick...|545-62-3152|[[70288, [Rebecca...|[[Vaughn Inc, [20...|[Vaughn Inc, Brow...|       5|
|[Elizabeth, [[And...|394-89-8545|[[4

## Sampling from Aerospike DB

   - interplay of `aerospike.partition.factor` and `aerospike.sample.size`
   - `aerospike.partition.factor` : Decides how many spark partitions will be created (default 8, means 2^8 spark parititons, tune it based on the resource)
   - `aerospike.sample.size` : Approximately fetch specifed number of records from DB (avoids complete DB scan, default 0 means fetch everything from DB).
   - If you wish to get exact number then couple it with `limit`



In [17]:
//number_of_spark_partitions (num_sp)=2^{aerospike.partition.factor}
//total number of records = Math.ceil((float)aerospike.sample.size/num_sp) * (num_sp) 
//use lower partition factor for more accurate sampling
val setname="scala_input_data"
val sample_size=101

val df3=spark2.read.format("aerospike")
.option("aerospike.partition.factor","2")
.option("aerospike.set",setname)
.option("aerospike.sample.size","101") //allows to sample approximately spacific number of record.  
.load()

val df4=spark2.read.format("aerospike")
.option("aerospike.partition.factor","6")
.option("aerospike.set",setname)
.option("aerospike.sample.size","101") //allows to sample approximately spacific number of record.  
.load()

//Note since we were not able to divide evenly, we endup fetching almost 100 records
//notice the variation.
val count3=df3.count()
val count4=df4.count()

//Note how limit got only 101 record from df4 which have 128 records.
val dfWithLimit=df4.limit(101)
val limitCount=dfWithLimit.count()

setname: String = scala_input_data
sample_size: Int = 101
df3: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields]
df4: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields]
count3: Long = 104
count4: Long = 128
dfWithLimit: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: string, __digest: binary ... 7 more fields]
limitCount: Long = 101


## Use Aerospike Spark Connector Configuration properties in the Spark API to improve performance

aerospike.partition.factor: number of logical aerospike partitions [0-15]
aerospike.maxthreadcount : maximum number of threads to use for writing data into Aerospike
aerospike.compression : compression of java client-server communication
aerospike.batchMax : maximum number of records per read request (default 5000)
aerospike.recordspersecond : same as java client

#### Other
- aerospike.keyType : Primary key type hint for schema inference. Always set it properly if primary key type is not string
- aerospike.digestType : Digest type hint for schema inference. Always set it properly if digest type is not byte[]

See https://www.aerospike.com/docs/connect/processing/spark/reference.html for detailed description of the above properties