In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
import os
import pyspark

scala_version = '2.12'  # TODO: Ensure this is correct
spark_version = pyspark.__version__

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.5.1'
]

args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
if not args:
    args = f'--packages {",".join(packages)}'
    print('Using packages', packages)
    os.environ['PYSPARK_SUBMIT_ARGS'] = f'{args} pyspark-shell'
else:
    print(f'Found existing args: {args}') 

spark = SparkSession.builder\
  .master("spark://spark-master:7077") \
  .appName("kafka-example")\
  .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
  .config("spark.eventLog.enabled", "true") \
  .config("spark.eventLog.dir", "hdfs:///spark/logs/history") \
  .config("spark.history.fs.logDirectory", "hdfs:///spark/logs/history") \
  .getOrCreate()




Using packages ['org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5', 'org.apache.kafka:kafka-clients:3.5.1']
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0c8ef836-a4b2-4a4f-b3f4-635c74d13022;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.5 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.5 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found org.apache.kafka#kafka-clients;3.5.1 in central
	found com.github.luben#zstd-jni;1.5.5-1 in centra

In [2]:
# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka-1:9092") \
  .option("subscribe", "entrada") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

DataFrame[key: string, value: string]

In [3]:
# Split the lines into words
lines = df
words = lines.select(
        explode(
split(lines.value, " ")
   ).alias("word")
)

 # Generate running word count
wordCounts = words.groupBy("word").count()
outputDF = wordCounts.selectExpr(
    "CAST(word AS STRING) AS key",
    "CAST(count AS STRING) AS value"
)

In [5]:
 # Start running the query that prints the running counts to the console
query = wordCounts \
          .writeStream \
          .outputMode("complete") \
          .format("memory") \
          .queryName("consulta2") \
          .start()


25/04/07 10:32:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-aea793d2-f4e2-4ce5-a98f-9642b4717e91. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/07 10:32:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
[Stage 1:=>              (18 + 2) / 200][Stage 3:>                (0 + 0) / 200]

In [4]:

query = outputDF \
    .writeStream \
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka-1:9092") \
    .option("topic", "salida") \
    .outputMode("complete") \
    .option("checkpointLocation", "/user/jovyan/checkpoint") \
    .start()


25/04/07 10:31:14 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
[Stage 1:====>                                                   (16 + 2) / 200]

In [None]:
from IPython.display import display, clear_output
from time import sleep

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM consulta2').show())
    sleep(5)

{'message': 'Getting offsets from KafkaV2[Subscribe[entrada]]',
 'isDataAvailable': False,
 'isTriggerActive': True}

+-----+-----+
| word|count|
+-----+-----+
|hello|    1|
|kitty|    1|
+-----+-----+



None

In [None]:
from IPython.display import display, clear_output
from time import sleep
clear_output(wait=True)
display(query.status)
display(spark.sql('SELECT * FROM consulta1').show())



In [None]:
from pyspark.sql.functions import to_json
kafka_output_topic = "salida"

kafka_output_config = {
    "kafka.bootstrap.servers": "kafka-1:9092",  # Coloca aquí los servidores de arranque de Kafka
    "topic": kafka_output_topic
}

query2 = wordCounts \
    .selectExpr("CAST(word AS STRING) AS key", "to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .outputMode("complete") \
    .options(**kafka_output_config) \
    .option("checkpointLocation", "/home/jovyan/checkpoints") \
    .start()


In [None]:
query.awaitTermination()