In [18]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,com.datastax.spark:spark-cassandra-connector_2.12:3.3.0")
    #.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.3.0")
    .config('spark.cassandra.connection.host', ' 172.18.0.2')
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
    .config("spark.sql.catalog.myCatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [19]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:29092")
    .option("subscribe", "users_created")
    .option("startingOffsets", "earliest")
    .load()
)

In [20]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [21]:
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
schema = StructType([
        #StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)
    ])


In [22]:
from pyspark.sql.functions import from_json,col
streaming_df = kafka_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col('value'), schema).alias('data')).select("data.*")

In [23]:
import logging
from cassandra.cluster import Cluster

In [24]:
def create_keyspace(session):
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS spark_streams
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """)

    print("Keyspace created successfully!")

In [25]:
def create_table(session):
    session.execute("""
    CREATE TABLE IF NOT EXISTS spark_streams.created_users (
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT,
        PRIMARY KEY (phone));
    """)

    print("Table created successfully!")

In [26]:
def insert_data(session, **kwargs):
    print("inserting data...")

    first_name = kwargs.get('first_name')
    last_name = kwargs.get('last_name')
    gender = kwargs.get('gender')
    address = kwargs.get('address')
    postcode = kwargs.get('post_code')
    email = kwargs.get('email')
    username = kwargs.get('username')
    dob = kwargs.get('dob')
    registered_date = kwargs.get('registered_date')
    phone = kwargs.get('phone')
    picture = kwargs.get('picture')

    try:
        session.execute("""
            INSERT INTO spark_streams.created_users(first_name, last_name, gender, address, 
                post_code, email, username, dob, registered_date, phone, picture)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (first_name, last_name, gender, address,
              postcode, email, username, dob, registered_date, phone, picture))
        logging.info(f"Data inserted for {first_name} {last_name}")

    except Exception as e:
        logging.error(f'could not insert data due to {e}')

In [30]:
def create_cassandra_connection():
    try:
        # connecting to the cassandra cluster
        cluster = Cluster(['172.18.0.2'])

        cas_session = cluster.connect()

        return cas_session
    except Exception as e:
        logging.error(f"Could not create cassandra connection due to {e}")
        return None

In [31]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- post_code: string (nullable = true)
 |-- email: string (nullable = true)
 |-- username: string (nullable = true)
 |-- registered_date: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- picture: string (nullable = true)



In [None]:
session = create_cassandra_connection()
if session is not None:
    create_keyspace(session)
    create_table(session)

    logging.info("Streaming is being started...")

    streaming_query = (streaming_df.writeStream.format("org.apache.spark.sql.cassandra")
                               .outputMode("append")
                               .option('keyspace', 'spark_streams')
                               .option('table', 'created_users')
                               .option('checkpointLocation', "checkpoint_dir_kafka")
                               .start().awaitTermination())

    



Keyspace created successfully!
Table created successfully!


In [21]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- post_code: string (nullable = true)
 |-- email: string (nullable = true)
 |-- username: string (nullable = true)
 |-- registered_date: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- picture: string (nullable = true)



In [22]:
# Write the output to console sink to check the output

(streaming_df
 .writeStream
 .format("console")
 .outputMode("append")
 .option("checkpointLocation", "checkpoint_dir_kafka")
 .start()
 .awaitTermination())

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 