In [20]:
import sys
import os

#sys.path.insert(0, '/opt/cloudera/parcels/CDH/lib/spark/python/')
#sys.path.insert(0, '/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.9-src.zip')

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-oracle/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/CDH/lib/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"

sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.9-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [21]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import explode
from __future__ import print_function
import requests
import json
from collections import OrderedDict

sc = SparkContext('local', 'test-spark0')
ssc = StreamingContext(sc, 1)

kafkaParams = {"metadata.broker.list": "pnda14.gspie.lab:9092,pnda15.gspie.lab:9092,pnda13.gspie.lab:9092", "auto.offset.reset": "largest"}
topic = "telemetry"
sqlContext = SQLContext(sc)

kafkaStream = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams)

#kafka_rdd = kafkaStream.map(lambda (k,v): json.loads(v))
kafka_rdd = kafkaStream.map(lambda v: v[1])

In [22]:
path = "Cisco-IOS-XR-ipv4-bgp-oper:bgp/instances/instance/instance-active/default-vrf/process-info"
filter_list = [
            'restart-count',
            "nexthop-count",
            "neighbors-count-total",
            "established-neighbors-count-total",
            "neighbors-count",
            "update-messages-received",
            "configuration-items-processed",
            "path-count",
            "network-count",
            "rib-connection-up-count",
            "inbound-update-messages"
        ]

In [23]:
def transform(rdd):
    json_data = sqlContext.jsonRDD(rdd)
    df_select = json_data.select(
        "Telemetry.collection_end_time",
        "Telemetry.collection_id",
        "Telemetry.collection_start_time",
        "Telemetry.encoding_path",
        "Telemetry.msg_timestamp",
        "Telemetry.node_id_str",
        "Telemetry.subscription_id_str",
        "Rows",
        "Source")
    df_filtered = df_select.filter(df_select['encoding_path'] == path)
    df_data = df_filtered.select("node_id_str", "subscription_id_str", "encoding_path", "Rows")
    for row in df_data.collect():
        data = row.asDict()
        tags_master = {
            'NodeID' : data['node_id_str'],
            'Subscription' : data['subscription_id_str'],
            'EncodingPath' : data['encoding_path'].replace(":", "-")
            }
        contents = data['Rows']
        for content in contents:
            metrics = {
                    "metric": 'metric',
                    "timestamp": 'timestamp',
                    "value": 'value',
                    "tags": 'tags'
                }
            content_keys_master = {}
            content_keys = content_keys_master.copy()
            tags = tags_master.copy()
            content_timestamp = content.asDict()["Timestamp"]/1000
            metrics['timestamp'] = content_timestamp
            content_keys = content.asDict()["Keys"].asDict()
            for key in content_keys.keys():
                if content_keys[key]:
                    tags[key] = content_keys[key]
            metrics['tags'] = tags
            content_values = content.asDict()['Content'].asDict()
            tsdb = []
            metrics_loader_copy(metrics, content_values, filter_list, tsdb)
            tsdb = json.loads(json.dumps(tsdb))
            response = (tsdb_api_put(tsdb))
            if response:
                print(response)


def metrics_loader_copy(metrics, content, filter_list, tsdb):
    for key in content.keys():
        if 'asDict' in dir(content[key]):
            metrics_loader_copy(metrics, content[key].asDict(), filter_list, tsdb)
        else:
            if content[key] is not None:
                if key in filter_list:
                    metrics_copy = metrics.copy()
                    metrics_copy['metric'] = key
                    metrics_copy['value'] = content[key]
                    tsdb.append(metrics_copy)                

def tsdb_api_put(data):
    if data:
        host = 'gspie-opentsdb.cisco.com:4242'
        openTsdbUrl = 'http://' + host + '/api/put/details'
        request = requests.post(openTsdbUrl, json = data)
        return request.text

In [24]:
kafka_rdd.foreachRDD(lambda rdd: sc.parallelize(transform(rdd)))
#kafka_rdd.pprint()
ssc.start()
#ssc.awaitTermination()

In [19]:
ssc.stop()