In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import  *

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

In [None]:
# default for startingOffsets is "latest"
df_kafka_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "ashraf-form-submissions") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [None]:
df_kafka_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [68]:
df_kafka_raw.show()

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka

In [None]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

In [None]:
from pyspark.sql import functions as F

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType

# Convert the 'value' column from binary to string
df_kafka_raw = df_kafka_raw.withColumn("value_str", df_kafka_raw["value"].cast("string"))

# Define the schema of your JSON data
json_schema = "module_name string, module_id string, email string, time_homework string, time_lectures string, score string"

# Apply the transformation to create separate columns for each field
df_with_json = df_kafka_raw.select(
    "*",
    from_json(col("value_str"), json_schema).alias("value_json")
).select(
    "*",
    "value_json.*"
).drop("value_str", "value_json")

# Show the schema of the DataFrame with separate columns for each field
# Selecting columns and casting their types
df_with_json = df_with_json.select(
    'module_name',
    'module_id',
    'email',
    col('time_homework').cast('float').alias('time_homework'),
    col('time_lectures').cast('float').alias('time_lectures'),
    col('score').cast('float').alias('score'),
    'timestamp',
    'offset'
)


root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- Module_ID: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Module_Name: string (nullable = true)
 |-- Time_Spent_Homework: string (nullable = true)
 |-- Time_Spent_Lectures: string (nullable = true)
 |-- Scores: string (nullable = true)



In [66]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuery

In [67]:
write_query = sink_console(df_with_json, output_mode='append')

24/02/18 12:40:57 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-bee813fc-1504-435d-92c5-eee61c32b79b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/02/18 12:40:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/02/18 12:40:57 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-b5894c10-1ffc-40bc-831d-759d141d9957-1875154463-executor-18, groupId=spark-kafka-source-b5894c10-1ffc-40bc-831d-759d141d9957-1875154463-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/02/18 12:40:57 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-b5894c10-1ffc-40bc-831d-759d141d9957-1875154463-exe

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----+-----------+-------------------+-------------------+------+
|key           |value                                                                                                                                                                         |Module_ID|Email|Module_Name|Time_Spent_Homework|Time_Spent_Lectures|Scores|
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----+-----------+-------------------+-------------------+------+
|m1-abc@abc.com|Module Name: Module 1: Containerization and Infrastructure as Code, Module ID: m1, Use

In [19]:
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

In [29]:
query_name = 'user_email_counts'
query_template = 'select count(email) from {table_name}'
write_query, df_user_email_counts = sink_memory(df=df_rides, query_name=query_name, query_template=query_template)

24/02/18 12:21:19 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e1d8f575-72a9-4afb-9ae4-d3d911109956. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/02/18 12:21:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/02/18 12:21:19 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-3c206947-f444-4523-8404-fc7e996a7c14-709777695-driver-0-7, groupId=spark-kafka-source-3c206947-f444-4523-8404-fc7e996a7c14-709777695-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/02/18 12:21:19 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-3c206947-f444-4523-8404-fc7e996a7c14-709777695-driver-

                                                                                

In [30]:
df_user_email_counts.show()

+------------+
|count(email)|
+------------+
|           2|
+------------+



In [28]:
write_query.stop()