## Spark jobs to add Data from Kafka to Cassandra

In [14]:
import logging
from uuid import uuid4
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType

In [15]:
#  Configure logging
logging.basicConfig(level=logging.INFO)

def create_keyspace(session):
    try:
        session.execute(""" 
        CREATE KEYSPACE IF NOT EXISTS spark_streams 
        WITH replication = {'class':'SimpleStrategy', 'replication_factor':'1'} 
        """)
        logging.info("Keyspace created successfully")
    except Exception as e:
        logging.error(f"Failed to create keyspace due to: {e}")

In [16]:
def create_table(session):
    try:
        session.execute(""" 
        CREATE TABLE IF NOT EXISTS spark_streams.created_users ( 
            id UUID PRIMARY KEY, 
            first_name TEXT, 
            last_name TEXT, 
            address TEXT, 
            email TEXT, 
            username TEXT, 
            dob TEXT, 
            phone TEXT, 
            profile_pic TEXT, 
            nationality TEXT 
        ) 
        """)
        logging.info("Table created successfully")
    except Exception as e:
        logging.error(f"Failed to create table due to: {e}")

In [24]:
from uuid import uuid4  # Ensure this is imported

def insert_data(session, **kwargs):
    try:
        session.execute(""" 
        INSERT INTO spark_streams.created_users(id, first_name, last_name, address, email, username, dob, phone, profile_pic, nationality) 
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) 
        """, ( 
            uuid4(),
            kwargs.get("first_name"), 
            kwargs.get("last_name"), 
            kwargs.get("address"), 
            kwargs.get("email"), 
            kwargs.get("username"), 
            kwargs.get("dob"), 
            kwargs.get("phone"), 
            kwargs.get("profile_pic"), 
            kwargs.get("nationality") 
        )) 
        logging.info("Data inserted successfully")
    except Exception as e:
        logging.error(f"Could not add the data due to {e}")

In [18]:
def create_spark_connection():
    try:
        s_conn = SparkSession.builder \
            .appName('SparkDataBatch') \
            .config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1," 
                                           "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
            .config('spark.cassandra.connection.host', 'localhost') \
            .getOrCreate()
        
        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark Connection created successfully")
        return s_conn
    except Exception as e:
        logging.error(f"An error occurred while creating Spark connection: {e}")
        return None


In [19]:
def cassandra_connection():
    try:
        cluster = Cluster(['localhost'])  # Using localhost for Cassandra
        session = cluster.connect()
        logging.info("Cassandra connection established successfully")
        return session
    except Exception as e:
        logging.error(f"Couldn't establish Cassandra connection due to {e}")
        return None

In [20]:
def connect_to_kafka(spark_conn):
    try:
        spark_df = spark_conn.read \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'localhost:9092') \
        .option('subscribe', 'users_created') \
        .option('startingOffsets', 'earliest') \
        .load()
        logging.info("Kafka DataFrame created successfully")
        
        return spark_df
    except Exception as e:
        logging.error(f"Error creating the DataFrame from Kafka: {e}")
        return None
     

In [21]:
def create_selection_df_from_kafka(spark_df):
    schema = StructType([ 
        StructField('id', StringType(), False), 
        StructField('first_name', StringType(), False), 
        StructField('last_name', StringType(), False), 
        StructField('address', StringType(), False), 
        StructField('email', StringType(), False), 
        StructField('username', StringType(), False), 
        StructField('dob', StringType(), False), 
        StructField('phone', StringType(), False), 
        StructField('profile_pic', StringType(), False), 
        StructField('nationality', StringType(), False), 
    ])   
    
    # Select value from Kafka and print the raw messages
    raw_df = spark_df.selectExpr('CAST(value AS STRING)') 

    # Write the raw messages to console for debugging
    raw_df.show(truncate=False)
    
    # Attempt to parse the JSON schema
    try:
        sel = raw_df.select(from_json(col('value'), schema).alias('data')).select("data.*")
        logging.info("Selection DataFrame created successfully")
        logging.info("Schema matches expected structure.")
    except Exception as e:
        logging.error(f"Schema mismatch or other error occurred: {e}")
        sel = None
    
    return sel

In [22]:
def write_to_cassandra(select_df, session):
    # Collect the DataFrame rows and insert into Cassandra
    try:
        for row in select_df.collect():
            insert_data(session, **row.asDict())
        logging.info("Data written to Cassandra successfully")
    except Exception as e:
        logging.error(f"Failed to write data to Cassandra: {e}")

In [25]:
spark_conn = create_spark_connection()
session = cassandra_connection()

if spark_conn and session:
    create_keyspace(session)
    create_table(session)
    
    df = connect_to_kafka(spark_conn)
    if df:
        select_df = create_selection_df_from_kafka(spark_df=df)

        # Validate incoming data schema
        schema = StructType([ 
            StructField('id', StringType(), False), 
            StructField('first_name', StringType(), False), 
            StructField('last_name', StringType(), False), 
            StructField('address', StringType(), False), 
            StructField('email', StringType(), False), 
            StructField('username', StringType(), False), 
            StructField('dob', StringType(), False), 
            StructField('phone', StringType(), False), 
            StructField('profile_pic', StringType(), False), 
            StructField('nationality', StringType(), False), 
        ])
        
        # Write to Cassandra
        write_to_cassandra(select_df, session)

# Close the session
if session:
    session.shutdown()

INFO:root:Spark Connection created successfully
INFO:cassandra.policies:Using datacenter 'datacenter1' for DCAwareRoundRobinPolicy (via host '::1:9042'); if incorrect, please specify a local_dc to the constructor, or limit contact points to local cluster nodes
INFO:cassandra.cluster:Cassandra host 127.0.0.1:9042 removed
INFO:root:Cassandra connection established successfully
INFO:root:Keyspace created successfully
INFO:root:Table created successfully
INFO:root:Kafka DataFrame created successfully
INFO:root:Selection DataFrame created successfully
INFO:root:Schema matches expected structure.


+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             

INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
INFO:root:Data inserted successfully
I