## 01 Setup
1. Build Spark Session
2. Build Cassandra Session

In [1]:
import logging

from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

s_conn = None
s_conn = SparkSession.builder \
    .appName('SparkDataStreaming') \
    .config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.12:3.4.1,"
                                    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .config('spark.cassandra.connection.host', 'localhost') \
    .getOrCreate()

s_conn.sparkContext.setLogLevel("ERROR")

# connecting to the cassandra cluster
try:
    cluster = Cluster(['localhost'])

    cas_session = cluster.connect()
except Exception as e:
    logging.error(f"Could not create cassandra connection due to {e}")

## 02 Contruct the Expected Schema


In [2]:
schema = StructType([
    StructField("id", StringType(), False),
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("gender", StringType(), False),
    StructField("address", StringType(), False),
    StructField("post_code", StringType(), False),
    StructField("email", StringType(), False),
    StructField("username", StringType(), False),
    StructField("registered_date", StringType(), False),
    StructField("phone", StringType(), False),
    StructField("picture", StringType(), False)
])

In [3]:
cas_session.execute("""
    CREATE KEYSPACE IF NOT EXISTS spark_streams
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")

print("Keyspace created successfully!")

Keyspace created successfully!


In [70]:
spark_view_df = s_conn.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'users_created') \
    .option('startingOffsets', 'latest') \
    .load()
spark_select_df = spark_view_df.selectExpr("CAST(value AS STRING)") 

In [71]:
spark_select_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .option("numRows", 50) \
    .start()

query = None # Initialize to insure we can check if it was started

## 03a Experiment: Consume Stream query in a Notebook
### Results:
You can write the stream to "memory" with function ".format", then name the query in memory to be accessed later via a Spark-SQL query.
Use ".show()", ".head(10)" or ".collect()" to pull the query into a usable or viewable data set.

### Notes:
    - If you recreate the writeStream aka variable "spark_select_df" then the queryName = "streaming_df" will already exists and fail.
    - Also, if you recreate the readStream , then since the stream already consumed the data then nothing will show up.
        - Only true, if the startingOffset is set to "latest" and not "earliest"


In [76]:
if query is not None and query.isActive:
    print('Query already active')
else:
    query = spark_select_df.writeStream \
        .outputMode("append") \
        .format("memory") \
        .queryName("streaming_df7").start()

# To view the results, you can then query the in-memory table
s_conn.sql("SELECT * FROM streaming_df7").head(5)

Query already active


[Row(value='{"id": 228627584116828766513234465659512718569, "first_name": "Ridwan", "last_name": "Aga", "gender": "male", "address": "4417 \\u00d8vre Ullern terrasse, Fagernes, Troms - Romsa, Norway", "post_code": "4003", "email": "ridwan.aga@example.com", "username": "blackkoala405", "dob": "1987-02-11T21:29:24.669Z", "registered_date": "2013-04-18T09:59:05.607Z", "phone": "64108927", "picture": "https://randomuser.me/api/portraits/med/men/12.jpg"}'),
 Row(value='{"id": 36548244493338199433306678270268393268, "first_name": "Ernesto", "last_name": "Fuentes", "gender": "male", "address": "1962 Calle de Bravo Murillo, Parla, Melilla, Spain", "post_code": 23576, "email": "ernesto.fuentes@example.com", "username": "smallgorilla441", "dob": "1948-05-28T20:11:06.089Z", "registered_date": "2022-01-28T09:57:57.803Z", "phone": "972-304-465", "picture": "https://randomuser.me/api/portraits/med/men/36.jpg"}')]

In [64]:
s_conn.sql("SELECT * FROM streaming_df").head(5)

[Row(value='{"id": 247817924588245923341384306894570166991, "first_name": "Ida", "last_name": "Nguyen", "gender": "female", "address": "8085 Pockrus Page Rd, Roseville, California, United States", "post_code": 36146, "email": "ida.nguyen@example.com", "username": "redlion718", "dob": "1982-08-07T00:02:02.743Z", "registered_date": "2009-05-16T23:40:21.802Z", "phone": "(477) 278-5931", "picture": "https://randomuser.me/api/portraits/med/women/10.jpg"}'),
 Row(value='{"id": 174284896883338618945951761980772274025, "first_name": "L\\u00e6rke", "last_name": "Larsen", "gender": "female", "address": "2738 Poppel Alle, K\\u00f8benhavn V, Sj\\u00e6lland, Denmark", "post_code": 83832, "email": "laerke.larsen@example.com", "username": "bigfrog983", "dob": "1963-09-02T22:01:34.412Z", "registered_date": "2017-02-01T01:27:03.264Z", "phone": "85613719", "picture": "https://randomuser.me/api/portraits/med/women/31.jpg"}'),
 Row(value='{"id": 214859958238539815279719897260767091546, "first_name": "Myrt

## 03b Experiment: Read from the same stream under a different behavior.
Result: The stream will be empty as long as no new records were produced on the topic.

In [None]:
query_same_stream = spark_select_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("streaming_same_df").start()

# To view the results, you can then query the in-memory table
s_conn.sql("SELECT * FROM streaming_same_df").show()

+-----+
|value|
+-----+
+-----+



## 04 Stream into a new dataframe with JSON parsed

In [None]:
spark_df = s_conn.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'users_created') \
    .option('startingOffsets', 'earliest') \
    .load()

sel = spark_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col('value'), schema).alias('data')).select("data.*")
print(sel)

In [None]:
s_table_name = 'created_users1'
cas_session.execute(f"""
    CREATE TABLE IF NOT EXISTS spark_streams.{s_table_name} (
        id TEXT PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT);
""")

print("Table created successfully!")

Table created successfully!


In [None]:
streaming_query = (sel.writeStream.format("org.apache.spark.sql.cassandra")
                               .option('checkpointLocation', '/tmp/checkpoint')
                               .option('keyspace', 'spark_streams')
                               .option('table', 'created_users2')
                               .start())

streaming_query.isActive()
# streaming_query.awaitTermination()