# Spark and Aerospike using pyspark


findspark setup needed for jupyter requires pip install findspark.
If only using pyspark shell can skip


In [None]:

import aerospike
import findspark
findspark.init()




In Jupyter findspark.init() must be called before the import of pyspark

In [None]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import LongType, StringType, StructField, StructType




Set up spark and point at locally running aerospike on default port.

In [None]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

spark.conf.set("aerospike.namespace", "test")
spark.conf.set("aerospike.seedhost", "localhost")
spark.conf.set("aerospike.port", "3000")

## Example 1

Simple example writing raw data usign the client and loading/querying

In [None]:
print("Writing test data using embedded client")
config = {"hosts": [("127.0.0.1", 3000)]}

try:
    client = aerospike.client(config).connect()
except Exception as e:
    import sys
    print("failed to connect to the cluster with", config["hosts"], str(e))
    sys.exit(1)

for num in range(10):
    key = ("test", "spark-test", "spark-test" + str(num))
    print(str(key))
    client.put(key, {"one": num, "two": "two_" + str(num), "three": num})

print("Loading test data to dataframe")
thingsDF = (
    spark.read.format("com.aerospike.spark.sql")
    .option("aerospike.set", "spark-test")
    .load()
)

print("All test data")
thingsDF.show()

print("Filtering test data")
thingsDF.registerTempTable("things")
filteredThings = spark.sql("select * from things where one = 5")
print("Just thing number 5")
filteredThings.show()

## Example 2

Example making use of schemas to save dataframes as well as queries and operations.

In [None]:
rows = [
    ("Bill_Malcolm", "Bill", "Malcolm", 1975, "Indie"),
    ("James_Bob", "James", "Bob", 1983, "Rep"),
    ("Martin_Paul", "Martin", "Paul", 1991, "Dem"),
    ("Smith_John", "Smith", "John", 1996, "Indie"),
    ("Smallberries_Kevin", "Smallberries", "Kevin", 2007, "Dem"),
    ("Kohl_Jen", "Kohl", "Jen", 2009, "Indie"),
    ("Prichard_Julia", "Prichard", "Julia", 2010, "Rep"),
    ("Williams_Tony", "Williams", "Tony", 2013, "Indie"),
    ("Babka_Malcom", "Babka", "Malcom", 2015, "Rep"),
]

schema = StructType(
    [
        StructField("key", StringType(), True),
        StructField("last", StringType(), True),
        StructField("first", StringType(), True),
        StructField("bornDate", LongType(), True),
        StructField("party", StringType(), True),
    ]
)
inputRDD = sc.parallelize(rows)
newDF = sqlContext.createDataFrame(inputRDD, schema)

(
    newDF.write.mode("overwrite")
    .format("com.aerospike.spark.sql")
    .option("aerospike.set", "people")
    .option("aerospike.updateByKey", "key")
    .save()
)

peopleDF = (
    sqlContext.read.format("com.aerospike.spark.sql")
    .option("aerospike.set", "people")
    .load()
)
peopleDF.createOrReplaceTempView("people")

print("All voters")
peopleDF.show()
print("Voter parties")
peopleDF.groupBy("party").count().show()

print("Independent voters of legal age")
voterDF = spark.sql('SELECT * FROM people WHERE bornDate < 2001 AND party = "Indie"')
voterDF.show()

print("Writing filtered voters to independent_voters set")
(
    voterDF.write.mode("overwrite")
    .format("com.aerospike.spark.sql")
    .option("aerospike.set", "independent_voters")
    .option("aerospike.updateByKey", "key")
    .save()
)

You can see the filtered data from the Aerospike aql tool:


```
aql> select * from test.independent_voters
+----------+-----------+----------------+---------+---------+
| bornDate | first     | key            | last    | party   |
+----------+-----------+----------------+---------+---------+
| 1975     | "Malcolm" | "Bill_Malcolm" | "Bill"  | "Indie" |
| 1996     | "John"    | "Smith_John"   | "Smith" | "Indie" |
+----------+-----------+----------------+---------+---------+
2 rows in set (0.054 secs)
```

In [None]:

sc.stop()
print("Done")