# Spark and Elasticsearch Integration
This notebook demonstrates how you can use Spark to read in streaming data, perform a transform on the data, and write the new data to Elasticsearch.

It also demonstrates how you can use Spark to read in an index from Elasticsearch.

### Installing ES-Hadoop

First we need to install the Elastic-hadoop connector. This should be installed on the same path across your entire cluster.

In [None]:
import os
import urllib.request
import zipfile

If the ES-Hadoop connector is not installed, install it on the driver. Note again that this should be installed across the entire cluster if running this in production. Since this example uses a single node cluster we are in good shape.

In [None]:
if 'elasticsearch-hadoop-6.1.1' not in os.listdir():
    es_hadoop = urllib.request.URLopener()
    es_hadoop.retrieve("http://download.elastic.co/hadoop/elasticsearch-hadoop-6.1.1.zip", "es-hadoop.zip")

    with zipfile.ZipFile("es-hadoop.zip","r") as zip_ref:
        zip_ref.extractall()

Now, let's install the `elasticsearch` Python library on this Spark node so we can set up an Elasticsearch index quickly and easily.

### Getting Elasticsearch Setup

In [None]:
!pip install elasticsearch

Let's go ahead and create an index called `stream-test`. If it already exists, let's wipe it out and start over:

In [None]:
from elasticsearch import Elasticsearch

In [None]:
# first set up Elasticsearch connection
# by default we connect to elasticsearch:9200 
# since we are running this notebook from the Spark-Node we need to use `elasticsearch` instead of `localhost`
# as this is the name of the docker container running Elasticsearch
es = Elasticsearch('elasticsearch:9200')

# if the stream-test index exists, wipe it out and create a new one
if es.indices.exists('stream-test'):
    es.indices.delete('stream-test')
    es.indices.create('stream-test')

### Spark Streaming to Elasticsearch

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import json
import time
from datetime import datetime

We need to make sure that the ES-Hadoop connector is on the driver's classpath

In [None]:
import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-hadoop-6.1.1/dist/elasticsearch-spark-20_2.11-6.1.1.jar pyspark-shell'  

Now we can create our Spark Context

In [None]:
sc = SparkContext(appName="PythonSparkStreaming")  
sc.setLogLevel("WARN") 

...and our Streaming Context

In [None]:
# note that the second argument is the batch time
ssc = StreamingContext(sc, 3)  

Next, let's generate a file stream. In this case we are going to stream in all files written to the `sample` directory. This is being pumped full of random data.

In [None]:
stream = ssc.textFileStream('sample/')

We are going to want to perform some slight transforms on this data. Primarily we want to parse the epoch time into a date string that Elasticsearch can understand.

In [None]:
def format_sample(x):
    data = json.loads(x)
    data['timestamp'] = datetime.fromtimestamp(data['timestamp']).strftime('%Y/%m/%d %H:%M:%S')
    data['doc_id'] = data.pop('count')
    return (data['doc_id'], json.dumps(data))

We will map the function we just created to the stream so that each record, in each batch, is parsed with the same function.

In [None]:
parsed = stream.map(lambda x: format_sample(x))

Next, let's define a function that writes the RDD generated by each streaming batch operation to Elasticsearch.

In [None]:
def handler(rdd):
        es_write_conf = {
        # specify the node that we are sending data to (this should be the master)
        "es.nodes" : 'elasticsearch',
            
        # specify the port in case it is not the default port
        "es.port" : '9200',
            
        # specify a resource in the form 'index/doc-type'
        "es.resource" : 'stream-test/sample',

        # is the input JSON?
        "es.input.json" : "yes",
            
        # is there a field in the mapping that should be used to specify the ES document ID
        "es.mapping.id": "doc_id",
        }

        rdd.saveAsNewAPIHadoopFile(
                path='-',
                outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
                keyClass="org.apache.hadoop.io.NullWritable",
                valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",

                # critically, we must specify our `es_write_conf` 
                conf=es_write_conf)
        
        

Now we can apply our handler to each record streaming through the system

In [None]:
parsed.foreachRDD(lambda rdd: handler(rdd))

We can request it is printed to stdout as well

In [None]:
parsed.pprint()

Finally, we can start the spark context

In [None]:
ssc.start()

-------------------------------------------
Time: 2018-01-18 14:56:42
-------------------------------------------

-------------------------------------------
Time: 2018-01-18 14:56:45
-------------------------------------------



If we want to stop the context we need to invoke the stop method

In [None]:
ssc.stop()

### Bulk Processing ES with Spark 

Let's generate a new Spark Context since we killed our last one. We'll use this to operate on an entire index of ES data using Spark. In this case we'll read back in data we just shipped using Spark Streaming.

In [None]:
sc = SparkContext(appName="PythonSparkReading")  
sc.setLogLevel("WARN") 

In [None]:
es_read_conf = { 
    # specify the node that we are sending data to (this should be the master)    
    "es.nodes" : "elasticsearch",
    
    # specify the read resource in the format 'index/doc-type'
    "es.resource" : "stream-test/sample"
    }

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

Let's take in a sample of the data we just pulled from Elasticsearch. Note that each object is a tuple where the first item is the document id.

In [None]:
es_rdd.take(5)

Let's convert these tuples to pure JSON

In [None]:
es_rdd = es_rdd.map(lambda x: x[1])

In [None]:
es_rdd.take(1)

Now, let's convert the RDD into a Spark SQL Dataframe so we can treat it more like a Pandas object

In [None]:
from pyspark.sql import SparkSession, SQLContext, Row

# Executes a monkey patch to the Spark Context, extending it with Spark SQL capabilities
spark = SparkSession \
    .builder \
    .appName("Spark SQL") \
    .getOrCreate()

In [None]:
df = es_rdd.map(lambda l: Row(**dict(l))).toDF()

In [None]:
df.take(1)

We can execute a groupby on the data. In this case we'll groupby `name`

In [None]:
df \
    .groupby('name') \
    .count() \
    .collect()

We can also filter data as we would in Pandas

In [None]:
df \
    .filter(df.name == 'Samwise')\
    .take(1)

Again, we need to stop the context if we wish to switch back to a Streaming Context

In [None]:
sc.stop()