In [1]:
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession

In [2]:
def write_to_cassandra(target_df, batch_id):
    target_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .option("keyspace", "spark_db") \
        .option("table", "customer_search") \
        .mode("append") \
        .save()
    target_df.show()

In [3]:
spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("Stream Table Join Demo") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.shuffle.partitions", 2) \
        .config("spark.cassandra.connection.host", "localhost") \
        .config("spark.cassandra.connection.port", "9042") \
        .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
        .config("spark.sql.catalog.lh", "com.datastax.spark.connector.datasource.CassandraCatalog") \
        .getOrCreate()

In [4]:
search_schema = StructType([
        StructField("id", StringType()),
        StructField("customer_id", StringType()),
        StructField("customer_name", StringType()),
        StructField("product_searched", StringType()),
        StructField("search_date", StringType()),
        StructField("country_name", StringType()),
        StructField("state", StringType())
    ])

In [5]:
kafka_source_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "product-customer-qty") \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", False) \
        .load()

In [6]:
value_df = kafka_source_df.select(from_json(col("value").cast("string"), search_schema).alias("value"))

In [7]:
prod_customer_df = value_df.select("value.*") \
        .withColumn("search_date", to_timestamp(col("search_date"), "yyyy-MM-dd HH:mm:ss"))

In [8]:
prod_customer_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product_searched: string (nullable = true)
 |-- search_date: timestamp (nullable = true)
 |-- country_name: string (nullable = true)
 |-- state: string (nullable = true)



In [9]:
output_df = prod_customer_df.select(col("id"), col("customer_id"), col("customer_name"), col("product_searched"), 
                             col("search_date"), col("country_name"),col("state"))
#                             .withColumn("idNum", col("customer_id").cast(IntegerType())) \
#                             .withColumn("id",    col("id").cast(IntegerType()))

In [10]:
# #to view the data in the console
notification_writer_query = output_df.writeStream \
        .format("console") \
        .outputMode("append") \
        .option("truncate", "false") \
        .option("checkpointLocation", "./checkpoints/cassandra-proj/") \
        .start()

# # notification_writer_query.awaitTermination()

In [11]:
#Aggregations Val

# agg_output_df = output_df.groupBy("country_name", "product_searched")\
#       .agg(count("id")).alias("search_qty")

# agg_search_locations_df = output_df.groupBy("name")\
#      .agg(sum("idNum"), count("id"))

In [12]:
# output_query = output_df.writeStream \
output_query = output_df.writeStream \
        .foreachBatch(write_to_cassandra) \
        .outputMode("update") \
        .option("checkpointLocation", "./checkpoints/cassandra-proj") \
        .trigger(processingTime="1 minute") \
        .start()

In [None]:
output_query.awaitTermination()

+---+-----------+-------------+--------------------+-------------------+--------------+-------------------+
| id|customer_id|customer_name|    product_searched|        search_date|  country_name|              state|
+---+-----------+-------------+--------------------+-------------------+--------------+-------------------+
|422|         11|       Emilia|         Samsung Tab|2023-04-29 11:23:45|        Zambia|Copperbelt Province|
|423|         10|        Emona|           Bed Sheet|2023-04-29 11:24:15|Czech Republic|             Semily|
|424|         14|       Mirtha|  Asus Gaming Laptop|2023-04-29 11:24:45|    Azerbaijan|   Agstafa District|
|425|         19|    Johnathan|          Fire Stick|2023-04-29 11:25:15|     Lithuania|    Šiauliai County|
|426|          6|       Emilio|     Ninja Air Frier|2023-04-29 11:25:45| United States|             Alaska|
|427|          9|          Jox|Samsung Galaxy s2...|2023-04-29 11:26:15|        Latvia| Ludza Municipality|
+---+-----------+-----------