# 8.1 Spark Streaming

## Apache Kafka

### Kafka Producer

In [None]:
import time
import json
from kafka import KafkaProducer


producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
 
data = {'hello': 'world', 'time': time.time()}
producer.send('dsp', data)

### Kafka Consumer

In [None]:
import json
from kafka import KafkaConsumer


consumer = KafkaConsumer(
    'dsp',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
 
for x in consumer:
    print(x.value)

## Sklearn Streaming

In [None]:
import os
import json


external_ip = os.environ['KAFKA_BOOTSTRAP_SERVER']

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", f"{external_ip}:9092")
    .option("subscribe", "dsp")
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("value_deserialized", 
                udf(lambda x: json.loads(x.decode('utf-8')))("value"))
)

display(df)

### Model Producer

In [None]:
import os
import json
import time
import uuid
from kafka import KafkaProducer


external_ip = os.environ['KAFKA_BOOTSTRAP_SERVER']

producer = KafkaProducer(
    bootstrap_servers=[f'{external_ip}:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

data = {'G1': 1, 'G2': 0, 'G3': 0, 'G4': 0, 'G5': 0, 
        'G6': 0, 'G7': 0, 'G8': 0, 'G9': 0, 'G10': 0, 
        'User_ID': str(uuid.uuid1())}
result = producer.send('dsp', data)
result.get()

### Streaming Pipeline 

In [None]:
import os
import json
import pandas as pd
from pyspark.sql.types import StringType 
from sklearn.linear_model import LogisticRegression


external_ip = os.environ['KAFKA_BOOTSTRAP_SERVER']

# build a logsitic regression model 
games_df = pd.read_csv("https://github.com/bgweber/Twitch/raw/master/Recommendations/games-expand.csv")
model = LogisticRegression()
model.fit(games_df.drop(columns='label'), games_df['label'])

# read from Kafka 
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", f"{external_ip}:9092")
    .option("subscribe", "dsp")
    .load()
)

# define the UDF for scoring users 
def score(value):
    new_row = json.loads(value)
    new_x = pd.DataFrame.from_dict(data=new_row, orient="index").T        
    pred = model.predict_proba(new_x.drop(columns='User_ID'))[0][1]
    result = {'User_ID': new_row['User_ID'], 'pred': pred}
    return json.dumps(result)
  
# select the value field and apply the UDF     
df = df.selectExpr("CAST(value AS STRING)")
score_udf = udf(score, StringType())    
df = df.select(score_udf("value").alias("value"))

# write results to Kafka 
query = (
    df.writeStream.format("kafka")
    .option("kafka.bootstrap.servers", f"{external_ip}:9092")
    .option("topic", "preds")
    .option("checkpointLocation", "/temp")
    .start()
)

### Model Consumer

In [None]:
import os
import json
from kafka import KafkaConsumer


external_ip = os.getenv('KAFKA_BOOTSTRAP_SERVER')

consumer = KafkaConsumer(
    'preds',
    bootstrap_servers=[f'{external_ip}:9092'],
    value_deserializer=lambda x: json.loads(x)
)

for x in consumer:
    print(x.value)

# 8.2 Dataflow Streaming

## PubSub

### PubSub Consumer

In [None]:
import os
from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project=os.environ['GOOGLE_PROJECT_ID'], 
    subscription="dsp"
)

def callback(message):
    print(message.data)
    message.ack()

future = subscriber.subscribe(
    subscription=subscription_path, 
    callback=callback
)

try:
    future.result()
except KeyboardInterrupt:
    future.cancel()

### PubSub Producer

In [None]:
import os
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
    project=os.environ['GOOGLE_PROJECT_ID'], 
    topic="natality"
)

data = "Hello World!".encode('utf-8')
publisher.publish(topic=topic_path, data=data)

## Append Streaming

In [26]:
%%file scripts/stream.py

import argparse
import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

# define a function for transforming the data 
class AppendDoFn(beam.DoFn):
    def process(self, element):
        print("Hello World! - " + element.decode('utf-8'))
        
# set up pipeline options
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()
pipeline_options = PipelineOptions(pipeline_args)
project = pipeline_options.view_as(GoogleCloudOptions).project

# define the topics 
topic = "projects/{project}/topics/{topic}"
topic = topic.format(project=project, topic="natality")

# define the pipeline steps 
p = beam.Pipeline(options=pipeline_options)
(p
 | 'Read PubSub' >> beam.io.ReadFromPubSub(topic=topic)
 | 'Append' >> beam.ParDo(AppendDoFn()))

# run the pipeline 
result = p.run()
result.wait_until_finish()

Overwriting scripts/stream.py


## Natality Streaming

### Streaming Pipeline

In [27]:
%%file scripts/natality.py

import json
import argparse
import joblib
import pandas as pd
import apache_beam as beam

from google.cloud import storage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.datastore.v1new.types import Key
from apache_beam.io.gcp.datastore.v1new.types import Entity
from apache_beam.io.gcp.datastore.v1new.datastoreio import WriteToDatastore

class ApplyDoFn(beam.DoFn):

    def __init__(self):
        self.model = None
     
    def process(self, element):
        if self.model is None:
            bucket = storage.Client().get_bucket('dsp_model_store_00')
            blob = bucket.get_blob('natality/sklearn-linear')
            blob.download_to_filename('sklearn-linear')
            self.model = joblib.load('sklearn-linear')
        
        element = json.loads(element.decode('utf-8'))
        new_x = pd.DataFrame.from_dict(element, orient="index").T.fillna(0)   
        weight = self.model.predict(new_x.iloc[:, :8])[0]
        yield {'guid': element['guid'],
               'weight': weight,
               'time': str(element['time'])}
        
class CreateEntityDoFn(beam.DoFn):
    def process(self, element):
        key = Key(['natality-guid', element['guid']])
        entity = Entity(key)
        entity.set_properties({
            'weight': element['weight'],
            'time': element['time']
        })
        yield entity
        
# set up pipeline options
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()
pipeline_options = PipelineOptions(pipeline_args)
project = pipeline_options.view_as(GoogleCloudOptions).project

# define the topics 
topic = "projects/{project}/topics/{topic}"
topic = topic.format(project=project, topic="natality")

# define the pipeline steps
p = beam.Pipeline(options=pipeline_options)
(p
 | 'Read PubSub' >> beam.io.ReadFromPubSub(topic=topic)
 | 'Apply Model' >> beam.ParDo(ApplyDoFn())
 | 'Create Entities' >> beam.ParDo(CreateEntityDoFn())
 | 'Save to Datastore' >> WriteToDatastore(project)
)

# run the pipeline
result = p.run()
result.wait_until_finish()

Overwriting scripts/natality.py


### Streaming Producer

In [None]:
import os
import json
import time
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
    project=os.environ['GOOGLE_PROJECT_ID'], 
    topic="natality"
)

data = json.dumps({
    'year': 2001, 
    'plurality': 1, 
    'apgar_5min': 99, 
    'mother_age': 33, 
    'father_age': 40, 
    'gestation_weeks': 38, 
    'ever_born': 8, 
    'mother_married': 1,
    'weight': 6.8122838958, 
    'time': str(time.time()), 
    'guid': 'b281c5e8-85b2-4cbd-a2d8-e501ca816363'
}).encode('utf-8') 

publisher.publish(topic=topic_path, data=data)