In [6]:
import pyspark
print(pyspark.__version__)

import logging
from datetime import datetime
import uuid
import time_uuid

import pandas as pd
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

3.3.0


## Build spark

In [7]:
s_conn = SparkSession \
    .builder \
    .appName("SparkDataStreaming") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,"
                                "com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.3.0") \
    .config("spark.sql.shuffle.partitions", 4) \
    .config('spark.cassandra.connection.host', 'host.docker.internal') \
    .getOrCreate()

## Connect to kafka

In [8]:
spark_df = s_conn.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'kafka:29092') \
    .option('subscribe', 'user_created') \
    .option('startingOffsets', 'earliest') \
    .load()

In [13]:
# # Change 'readStream' to 'read' to read kafka
# spark_df = s_conn.read\
#     .format('kafka') \
#     .option('kafka.bootstrap.servers', 'kafka:29092') \
#     .option('subscribe', 'user_created') \
#     .option('startingOffsets', 'earliest') \
#     .load()

# spark_df.printSchema()
# spark_df.show(5,False)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Create spark dataframe from kafka

In [9]:
# Define type for each columns
schema = StructType([
    StructField("id", StringType(), True),
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("gender", StringType(), False),
    StructField("address", StringType(), False),
    StructField("post_code", StringType(), False),
    StructField("email", StringType(), False),
    StructField("username", StringType(), False),
    StructField("dob", TimestampType(), False),
    StructField("registered_date", TimestampType(), False),
    StructField("phone", StringType(), False),
    StructField("picture", StringType(), False)
])

# Define a function converting uuid to datetime (when records are updated)
def convert_uuid_to_datetime(uuid_string):
    uuid_object = uuid.UUID(uuid_string)
    timestamp = time_uuid.TimeUUID(bytes=uuid_object.bytes).get_timestamp()
    datetime_object = datetime.fromtimestamp(timestamp)
    return datetime_object

udf_convert_uuid_to_datetime = udf(lambda z: convert_uuid_to_datetime(z), TimestampType())

# Create Pyspark dataframe from Kafka
sel = spark_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col('value'), schema).alias('data')).select("data.*")\
    .withColumn('dob', substring(col('dob'), 1, 19))\
    .withColumn('registered_date', substring(col('registered_date'), 1, 19))\
    .withColumn('update_date', udf_convert_uuid_to_datetime(col('id')))\
    .withColumn('update_date', substring(col('update_date'), 1, 19))

In [5]:
# Define type for each columns
schema = StructType([
    StructField("id", StringType(), True),
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("gender", StringType(), False),
    StructField("address", StringType(), False),
    StructField("post_code", StringType(), False),
    StructField("email", StringType(), False),
    StructField("username", StringType(), False),
    StructField("dob", TimestampType(), False),
    StructField("registered_date", TimestampType(), False),
    StructField("phone", StringType(), False),
    StructField("picture", StringType(), False)
])

# Define a function converting uuid to datetime (when records are updated)
def convert_uuid_to_datetime(uuid_string):
    uuid_object = uuid.UUID(uuid_string)
    timestamp = time_uuid.TimeUUID(bytes=uuid_object.bytes).get_timestamp()
    datetime_object = datetime.fromtimestamp(timestamp)
    return datetime_object

udf_convert_uuid_to_datetime = udf(lambda z: convert_uuid_to_datetime(z), TimestampType())

# Create Pyspark dataframe from Kafka
sel = spark_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col('value'), schema).alias('data')).select("data.*")\
    .withColumn('dob', substring(col('dob'), 1, 19))\
    .withColumn('registered_date', substring(col('registered_date'), 1, 19))\
    .withColumn('update_date', udf_convert_uuid_to_datetime(col('id')))\
    .withColumn('update_date', substring(col('update_date'), 1, 19))

sel.limit(5).toPandas()

Unnamed: 0,id,first_name,last_name,gender,address,post_code,email,username,dob,registered_date,phone,picture,update_date
0,8e799fea-839a-11ee-aa9e-0242ac120002,Judith,Daniels,female,"8781 Novara Avenue, Enniscorthy, South Dublin,...",39798,judith.daniels@example.com,goldengorilla269,1952-08-06 21:48:59,2017-12-14 23:54:32,041-656-4034,https://randomuser.me/api/portraits/med/women/...,2023-11-15 09:37:13
1,8eb3b5a4-839a-11ee-aa9e-0242ac120002,Ege,Özbey,female,"807 Doktorlar Cd, Kırklareli, Hatay, Turkey",99151,ege.ozbey@example.com,sadostrich659,1945-02-02 18:17:02,2010-06-21 09:17:36,(151)-035-0499,https://randomuser.me/api/portraits/med/women/...,2023-11-15 09:37:13
2,8eebed48-839a-11ee-aa9e-0242ac120002,Branka,Spasić,female,"1917 Porodice Stanković, Plandište, Mačva, Serbia",78794,branka.spasic@example.com,smallleopard291,1947-01-11 05:33:39,2017-03-07 09:17:33,018-9648-429,https://randomuser.me/api/portraits/med/women/...,2023-11-15 09:37:14
3,8f23229a-839a-11ee-aa9e-0242ac120002,Maria,Walker,female,"7640 Wai-Iti Road, Wellington, Southland, New ...",13010,maria.walker@example.com,goldenmouse447,1989-10-21 07:39:55,2012-10-27 21:40:14,(340)-465-9882,https://randomuser.me/api/portraits/med/women/...,2023-11-15 09:37:14
4,8f591670-839a-11ee-aa9e-0242ac120002,Evaristo,Viana,male,"4641 Rua Santa Luzia , Paço do Lumiar, Minas G...",12803,evaristo.viana@example.com,lazyduck988,1971-05-24 12:39:02,2005-12-12 02:33:52,(67) 9093-5159,https://randomuser.me/api/portraits/med/men/10...,2023-11-15 09:37:14


## Create cassandra connection

In [10]:
cluster = Cluster(['host.docker.internal'])
cas_session = cluster.connect()
cas_session

<cassandra.cluster.Session at 0x7fd449884fa0>

### Create keyspace

In [11]:
cas_session.execute("""
    CREATE KEYSPACE IF NOT EXISTS spark_streams
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")

<cassandra.cluster.ResultSet at 0x7fd44995b610>

### Create table

In [12]:
cas_session.execute("""
    CREATE TABLE IF NOT EXISTS spark_streams.created_users (
        id UUID PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        dob TIMESTAMP,
        registered_date TIMESTAMP,
        phone TEXT,
        picture TEXT,
        update_date TIMESTAMP);
""")

<cassandra.cluster.ResultSet at 0x7fd44995b4c0>

### Insert_data to cassandra

In [13]:
streaming_query = (sel.writeStream.format("org.apache.spark.sql.cassandra")
                   .option('checkpointLocation', '/tmp/checkpoint')
                   .option('keyspace', 'spark_streams')
                   .option('table', 'created_users')
                   .start())

streaming_query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

## Read cassandra spark dataframe

In [14]:
cassandra_options = {
    "keyspace": "spark_streams",
    "table": "created_users"
}

cass_df = s_conn.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(**cassandra_options) \
    .load()

cass_df.show(5,False)

+------------------------------------+----------------------------------------------------------+-------------------+----------------------------+----------+------+---------+--------------+----------------------------------------------------+---------+-------------------+-------------------+-----------------+
|id                                  |address                                                   |dob                |email                       |first_name|gender|last_name|phone         |picture                                             |post_code|registered_date    |update_date        |username         |
+------------------------------------+----------------------------------------------------------+-------------------+----------------------------+----------+------+---------+--------------+----------------------------------------------------+---------+-------------------+-------------------+-----------------+
|9b1824d8-839a-11ee-aa9e-0242ac120002|1875 Stevens Creek Blvd, Hele