# Programming Model
The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model.

In [None]:
!pip install cassandra-driver

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType
import base64

In [None]:
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

In [None]:
# Create DataFrame representing the stream of input
lines = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-service.default.svc.cluster.local:9092") \
    .option("subscribe", "pn_classification") \
    .load()

print(lines.isStreaming) # True for DataFrames that have streaming sources
lines.printSchema()

In [None]:
# Split the lines into words
words = lines.select(
    explode(split(lines.value, " ")).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()

# Output is defined as what gets written out to the external storage.
    Complete Mode: The entire updated Result Table will be written to the external storage.
    Append Mode: Only the new rows appended in the Result Tables since the last trigger will be written to the external storage.
    Update Mode: Only the rows that were updated in the Result Table since the last trigger will be written to the external storage.

In [None]:
# Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
# Wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active.
query.awaitTermination()

# API using Datasets and DataFrames
DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point "SparkSession" to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets.

# Creating streaming DataFrames and streaming Datasets
Streaming DataFrames can be created through the DataStreamReader interface returned by SparkSession.readStream().
## Input source
    File source
    Kafka source
    Socket source (for testing)
    Rate source (for testing)
    Rate Per Micro-Batch source (for testing)

# Operations on streaming DataFrames/Datasets
You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations, to typed-RDD-like operations(map, filter, flatMap)
See the https://spark.apache.org/docs/latest/sql-programming-guide.html

# For Kafka
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import window
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType

In [None]:
spark = pyspark.sql.SparkSession \
    .builder \
    .appName("Spark-test-Kafka") \
    .getOrCreate()

# # Subscribe to 1 topic
# df = spark \
#     .readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka-service.default.svc.cluster.local:9092") \
#     .option("subscribe", "pn_classification") \
#     .load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# # Subscribe to 1 topic, with headers
# df = spark \
#     .readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka-service.default.svc.cluster.local:9092") \
#     .option("subscribe", "pn_classification") \
#     .option("includeHeaders", "true") \
#     .load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

# # Subscribe to multiple topics
# df = spark \
#     .readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka-service.default.svc.cluster.local:9092") \
#     .option("subscribe", "pn_classification,pn_classification_1") \
#     .load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-service.default.svc.cluster.local:9092") \
    .option("subscribePattern", "pn_*") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
df.printSchema()

In [None]:
userSchema = StructType() \
    .add("from", "string") \
    .add("test-data-Title", "string") \
    .add("test-data-device", "string") \
    .add("index", "integer") \
    .add("time", "string")

In [None]:
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
# Wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active.
query.awaitTermination()

In [None]:
# Basic operations - Selection, Projection, Aggregation
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Custom Schema
userSchema = StructType() \
    .add("from", "string") \
    .add("test-data-Title", "string") \
    .add("test-data-device", "string") \
    .add("index", "integer") \
    .add("time", "string")

# Create DataFrame representing the stream of input
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-service.default.svc.cluster.local:9092") \
    .option("subscribe", "pn_classification") \
    .load()

print(df.isStreaming) # True for DataFrames that have streaming sources
df.printSchema()
udf = spark.udf.register("decode", lambda x : x.decode("utf-8"))
df = df.withColumn("value", udf(df["value"]))
#df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
#df = df.select(col("value").cast("string").alias("tmp")).select(from_json(col('tmp'), userSchema))

# todo make unbounded_table

query = df \
    .writeStream \
    .format("console") \
    .start()
query.awaitTermination()

True
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [None]:
from pyspark.sql.functions import col, base64

df = df.withColumn("", base64(col("value")))