# Spark Streaming

In [None]:
import os
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sparkGodoy") \
            .config("spark.executor.memory", "3g")\
            .config("spark.driver.memory", "2g")\
            .config("spark.executor.cores", "2")\
            .config("spark.cores.max", "2")\
            .config("spark.ui.port", "4046")\
            .getOrCreate()

In [None]:
spark.sparkContext

Recurso para fazer streaming no Jupyter, sem bloquear o notebook

In [None]:
from nbthread_spark.stream import StreamRunner

In [None]:
from pyspark.sql.types import StructField, StructType, StringType

schema = StructType([
    StructField("", StringType(), True),
    StructField("userId", StringType(), True),
    StructField("experiment", StringType(), True),
    StructField("alternative", StringType(), True),
    StructField("conversion", StringType(), True)
])

Streaming estruturado usa Dataframe, o "antigo" usava RDD

In [None]:
def pshow(sparkDf, limit=10):
    return sparkDf.limit(limit).toPandas()

In [None]:
csv_stream = spark.readStream.format("csv").option("header", "true").option("sep", ",")\
                    .schema(schema).csv("*test.csv")

In [None]:
query = csv_stream.writeStream.format("memory").queryName("ab_impressions").start()
stream = StreamRunner(query)
stream.start()

In [None]:
spark.sql("select * from ab_impressions").count()

In [None]:
stream.stop()

## Amazon Music Reviews

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType

schema = StructType([
    StructField("asin", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("overall", FloatType(), True),
    StructField("reviewerID", StringType(), True),
    StructField("unixReviewTime", TimestampType(), True)
])

json_stream = spark.readStream.format("json").schema(schema).json("*amazon-music-reviews.json")

query = json_stream.writeStream.format("memory").queryName("amazon_reviews").start()
stream = StreamRunner(query)
stream.start()

In [None]:
pshow(spark.sql("select * from amazon_reviews"))

In [None]:
pshow(spark.sql("select overall, count(*) as count from amazon_reviews group by overall order by overall"))

In [None]:
reviews_df = spark.sql("select * from amazon_reviews")
reviews_count_df = reviews_df.groupBy(reviews_df["overall"]).count()
pshow(reviews_count_df)

In [None]:
stream.stop()

In [None]:
spark.catalog.listTables()

In [None]:
# spark.catalog.dropTempView("amazon_reviews_3")

# Kafka
**A distributed streaming platform**

In [None]:
import json
import time
import csv

from kafka import KafkaProducer
# lambda define uma funcao anonima
producer = KafkaProducer(
    bootstrap_servers="localhost", #9092
    value_serializer=lambda v: json.dumps(v).encode("utf-8") # transforma antes de enviar para o kafka
)

In [None]:
for i in range(10):
    with open("./ab_test.csv") as ab_file:
        reader = csv.DictReader(ab_file)
        for row in reader: #cada row é um dict ordenado de chave
            del row[""] # remove coluna inutil com nome ""
            future = producer.send("ab_bootcamp_godoy", row) # enviando
    print(i)
#     time.sleep(10)

In [None]:
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    bootstrap_servers="localhost",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    auto_offset_reset="earliest" #fala que quer desde o primeiro registro
)

consumer.subscribe("ab_bootcamp_godoy")

for msg in consumer:
    print(msg)

In [None]:
consumer.metrics()

In [None]:
consumer.close()