## satori2Kafka 

In [None]:
from __future__ import print_function
import socket
import json
import sys
import threading
import time
from satori.rtm.client import make_client, SubscriptionMode
from kafka import KafkaProducer

def satori2kafka(channel,endpoint, appkey):
    # Kafka
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    topic = "world-feed"

    with make_client(
            endpoint=endpoint, appkey=appkey) as client:
        print('Connected!')
        mailbox = []
        got_message_event = threading.Event()

        class SubscriptionObserver(object):
            def on_subscription_data(self, data):
                for message in data['messages']:
                    mailbox.append(message)
                    got_message_event.set()

        subscription_observer = SubscriptionObserver()
        client.subscribe(
            channel,
            SubscriptionMode.SIMPLE,
            subscription_observer)

        if not got_message_event.wait(10):
            print("Timeout while waiting for a message")
            sys.exit(1)

        while True:
            for message in mailbox:
                msg = json.dumps(message, ensure_ascii=False)
                producer.send(topic, msg.encode())
                # do not send the messages to fast for development
                time.sleep(0.5)

## helper functions for data enrichment

In [None]:
import langid

def get_language_from_text(text):
    lang, prob = langid.classify(text)
    return lang

In [None]:
from geolite2 import geolite2
import socket
from urllib.parse import urlparse


def get_country_from_url(url):
    try:
        hostname = urlparse(url)[1]
        ip = socket.gethostbyname(hostname)
        result = geolite2.reader().get(ip)
        country_iso_code = result['country']['iso_code']
    except:
        country_iso_code = "unknown"
    finally:
        geolite2.close()
    return country_iso_code

## define the schema of the initial rss feed from satori

In [None]:
from pyspark.sql.types import *

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("publishedTimestamp", TimestampType(), True), 
                          StructField("url", StringType(), True),
                          StructField("feedURL", StringType(), True),
                          StructField("title", StringType(), True),
                          StructField("description", StringType(), True)
                        ])

## Stream Processing 
read from kafka to a streaming data frame

In [None]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
parsed = (
  spark
    .readStream                       
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "world-feed")
    .load()
    .select(col("timestamp"),from_json(col("value").cast("string"),jsonSchema).alias("parsed_value"))
)

worldfeed = parsed.select("timestamp","parsed_value.*")
worldfeed.printSchema()
worldfeed.isStreaming

### enrich the streaming data frame with language and country of origin

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from worldfeed.location_lookup import get_country_from_url

language_classify_udf = udf(get_language_from_text, StringType())
get_country_from_url_udf = udf(get_country_from_url, StringType())

enriched_df = (
  worldfeed
    .withColumn('language', language_classify_udf(worldfeed['description']))
    .withColumn('server_country', get_country_from_url_udf(worldfeed['feedURL']))
)
enriched_df.isStreaming
enriched_df.printSchema()

## start the streaming
aggregate the data and write back to kafka for further processing

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small
query = (
  enriched_df
    .withWatermark("timestamp", "15 minutes")
    .groupBy(
      enriched_df.server_country,
      enriched_df.language, 
      window(enriched_df.timestamp, "15 minutes"))  
    .count()
    .select(to_json(struct("server_country", "window")).alias("key"),
        to_json(struct("window.start","window.end","server_country", "language", "count")).alias("value"))
    .writeStream
    .trigger(processingTime='5 seconds')

    # output to console for debug
    # .format("console")

    # output to kafka 
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "enriched-feed")
    .option("checkpointLocation", "./checkpoints")
    # End kafka related output
    .outputMode("update")  # complete = all the counts should be in the table
    .queryName("worldfeed")     # counts = name of the in-memory table
    .start()
)

## Start the Satori 2 Kafka Stream

In [None]:
channel = "big-rss"
endpoint = "wss://open-data.api.satori.com"
appkey = "8e7f2BeFE8C8c6e8A4A41976a2dE5Fa9"

satori2kafka(channel, endpoint, appkey)
# has to be manually cancelled

### helpers

In [None]:
query.stop()