# Spark Streaming
This notebook takes in the `gemini-feed` Kafka topic and produces to the `spark.out` topic a feed which includes the order price volume ratio and bid/ask liquidity for BTC.

In [None]:
!wget http://download.elastic.co/hadoop/elasticsearch-hadoop-6.1.1.zip

In [None]:
!unzip elasticsearch-hadoop-6.1.1.zip

In [5]:
!pip install elasticsearch

Collecting elasticsearch
  Downloading elasticsearch-6.1.1-py2.py3-none-any.whl (59kB)
[K    100% |████████████████████████████████| 61kB 459kB/s ta 0:00:01
Installing collected packages: elasticsearch
Successfully installed elasticsearch-6.1.1


In [41]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import json
import time
from datetime import datetime
from elasticsearch import Elasticsearch

In [147]:
# first set up Elasticsearch connection
# by default we connect to elasticsearch:9200 
# since we are running this notebook from the Spark-Node we need to use `elasticsearch` instead of `localhost`
# as this is the name of the docker container running Elasticsearch
es = Elasticsearch('elasticsearch:9200')

# if the stream-test index exists, wipe it out and create a new one
if es.indices.exists('stream-test'):
    es.indices.delete('stream-test')
    es.indices.create('stream-test')

In [148]:
import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path=elasticsearch-hadoop-6.1.1/dist/elasticsearch-spark-20_2.11-6.1.1.jar pyspark-shell'  

In [149]:
sc = SparkContext(appName="PythonSparkStreaming")  
sc.setLogLevel("WARN") 

In [150]:
ssc = StreamingContext(sc, 3)  

In [151]:
stream = ssc.textFileStream('sample/')

In [152]:
def format_sample(x):
    data = json.loads(x)
    data['timestamp'] = datetime.fromtimestamp(data['timestamp']).strftime('%Y/%m/%d %H:%M:%S')
    data['doc_id'] = data.pop('count')
    return (data['doc_id'], json.dumps(data))

In [153]:
parsed = stream.map(lambda x: format_sample(x))

In [154]:
def handler(rdd):
        es_write_conf = {
        "es.nodes" : 'elasticsearch',
        "es.port" : '9200',
        "es.resource" : 'stream-test/sample',
        "es.mapping.id": "doc_id",
        "es.input.json" : "yes"
        }

        rdd.saveAsNewAPIHadoopFile(
                path='-',
                outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
                keyClass="org.apache.hadoop.io.NullWritable",
                valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
                conf=es_write_conf)
        
        

In [155]:
parsed.foreachRDD(lambda rdd: handler(rdd))

In [156]:
parsed.pprint()

In [157]:
ssc.start()

-------------------------------------------
Time: 2018-01-16 20:51:18
-------------------------------------------
(582, '{"name": "Bilbo", "value": 33, "timestamp": "2018/01/16 20:51:16", "doc_id": 582}')
(583, '{"name": "Bilbo", "value": 10, "timestamp": "2018/01/16 20:51:17", "doc_id": 583}')

-------------------------------------------
Time: 2018-01-16 20:51:21
-------------------------------------------
(584, '{"name": "Legolas", "value": 0, "timestamp": "2018/01/16 20:51:18", "doc_id": 584}')
(586, '{"name": "Legolas", "value": 6, "timestamp": "2018/01/16 20:51:20", "doc_id": 586}')
(585, '{"name": "Samwise", "value": 55, "timestamp": "2018/01/16 20:51:19", "doc_id": 585}')

-------------------------------------------
Time: 2018-01-16 20:51:24
-------------------------------------------
(589, '{"name": "Samwise", "value": 21, "timestamp": "2018/01/16 20:51:23", "doc_id": 589}')
(587, '{"name": "Samwise", "value": 84, "timestamp": "2018/01/16 20:51:21", "doc_id": 587}')
(588, '{"na

In [158]:
ssc.stop()

-------------------------------------------
Time: 2018-01-16 20:51:30
-------------------------------------------
(594, '{"name": "Aragorn", "value": 4, "timestamp": "2018/01/16 20:51:28", "doc_id": 594}')
(595, '{"name": "Samwise", "value": 86, "timestamp": "2018/01/16 20:51:29", "doc_id": 595}')
(593, '{"name": "Gandalf", "value": 36, "timestamp": "2018/01/16 20:51:27", "doc_id": 593}')



In [159]:
sc = SparkContext(appName="PythonSparkReading")  
sc.setLogLevel("WARN") 

In [160]:
conf = { "es.resource" : "stream-test/sample", "es.nodes" : "elasticsearch"}
es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=conf)

In [161]:
es_rdd.take(5)

[('589',
  {'doc_id': 589,
   'name': 'Samwise',
   'timestamp': '2018/01/16 20:51:23',
   'value': 21}),
 ('585',
  {'doc_id': 585,
   'name': 'Samwise',
   'timestamp': '2018/01/16 20:51:19',
   'value': 55}),
 ('586',
  {'doc_id': 586,
   'name': 'Legolas',
   'timestamp': '2018/01/16 20:51:20',
   'value': 6}),
 ('587',
  {'doc_id': 587,
   'name': 'Samwise',
   'timestamp': '2018/01/16 20:51:21',
   'value': 84}),
 ('593',
  {'doc_id': 593,
   'name': 'Gandalf',
   'timestamp': '2018/01/16 20:51:27',
   'value': 36})]

In [163]:
es_rdd = es_rdd.map(lambda x: x[1])

In [164]:
es_rdd.take(1)

[{'doc_id': 589,
  'name': 'Samwise',
  'timestamp': '2018/01/16 20:51:23',
  'value': 21}]

In [166]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Learning Apach Spark") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()


In [167]:
df = spark.createDataFrame(es_rdd)



In [168]:
df.take(1)

[Row(doc_id=589, name='Samwise', timestamp='2018/01/16 20:51:23', value=21)]

In [172]:
df \
    .groupby('name') \
    .count() \
    .collect()

[Row(name='Aragorn', count=3),
 Row(name='Gandalf', count=1),
 Row(name='Legolas', count=3),
 Row(name='Samwise', count=5),
 Row(name='Bilbo', count=2)]

In [146]:
sc.stop()