### Streaming Index Pipeline

In [1]:
# Skip if you already have Kafka

# # pull docker images
# ! docker pull apache/kafka:3.8.0

# Start Kafka and Flink containers
# ! docker run -d --name kafka -p 9092:9092 apache/kafka:3.8.0

# # Install kafka-python client
# ! pip3 install kafka-python

# # Download spark-sql-kafka jar from https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.4.3
# # Download kafka-client jar from https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/3.3.2
# export PYSPARK_SUBMIT_ARGS='--jars /opt/spark-3.4.3/jars/spark-sql-kafka-0-10_2.12-3.4.3.jar,/opt/spark-3.4.3/jars/kafka-clients-3.3.2.jar pyspark-shell'

In [1]:
import json
import datetime

import sys

if sys.version_info >= (3, 12, 0):
    import six
    sys.modules['kafka.vendor.six.moves'] = six.moves

from kafka import KafkaProducer

def on_success(metadata):
    print(f"Message produced with the offset: {metadata.offset}")

def on_error(error):
    print(f"An error occurred while publishing the message. {error}")

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

message = {
    "COMMENT_ID" : "z123std54m2ozht10232efr5svb4vh0au04",
    "CONTENT": "damn nvm what I said"
}

# Send updates to 'youtube_update' topic
future = producer.send("youtube_update", message)
future.add_callback(on_success)
future.add_errback(on_error)

# Ensure all messages are sent before exiting
producer.flush()
producer.close()

Message produced with the offset: 1


In [2]:
# Download profane words dataset: https://www.kaggle.com/datasets/konradb/profanities-in-english-collection
path = "profanity_en.csv"

import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf()
spark = SparkSession.builder.appName("indexer").config(conf=conf).getOrCreate()

df = spark.read.csv(path)
profane_df = df.select(df._c1)
profane_df = profane_df.filter(df._c1 != 'canonical_form_1')
profane_df.show()

profane_set = set(profane_df.select(profane_df._c1).rdd.flatMap(lambda x: x).collect())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/09 23:06:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-----------+
|        _c1|
+-----------+
|         69|
|        ass|
|       fuck|
|       fuck|
|       fuck|
|        ass|
|        sex|
|        sex|
|     orgasm|
|     orgasm|
|ejaculation|
|       arse|
|       arse|
|       arse|
|   foreskin|
|       shit|
|       shit|
|      skank|
|        ass|
|    abraham|
+-----------+
only showing top 20 rows



In [5]:
# Read from kafka and mask profane words from content
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("COMMENT_ID", StringType(), True),
    StructField("CONTENT", StringType(), True)
])

kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_update") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

data = {
    "COMMENT_ID" : "z123std54m2ozht10232efr5svb4vh0au04",
    "CONTENT": "damn nvm what I said"
}

json_stream = kafka_stream.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

In [None]:
# Update the DataFrame to an Elasticsearch index
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

es_conf = {
    "es.nodes.discovery": "false",
    "es.nodes.data.only": "false",
    "es.net.http.auth.user": "elastic",
    "es.net.http.auth.pass": "password",
    "es.nodes": "http://127.0.0.1",
    "es.port": "9200",
    "es.mapping.id": "COMMENT_ID",
    "es.mapping.exclude": "COMMENT_ID",
    "es.write.operation": "update",
    "checkpointLocation": "/tmp/",
    "es.spark.sql.streaming.sink.log.enabled": "false"
}

def mask_profane_word(s):
    words = s.split()
    tokens = [t if t not in profane_set else '****' for t in words]
    return ' '.join(tokens)

def mask_profane_word_df(df, epoch_id):
    name = 'CONTENT'
    udf = UserDefinedFunction(lambda x: mask_profane_word(x), StringType())
    new_df = df.select(*[udf(column).alias(name) if column == name else column for column in df.columns])
    
    new_df.write.mode("append") \
        .format('org.elasticsearch.spark.sql') \
        .options(**es_conf) \
        .save('youtube')
   
query = json_stream.writeStream \
        .outputMode("append") \
        .foreachBatch(mask_profane_word_df) \
        .options(**es_conf) \
        .start("youtube")

query.awaitTermination()

24/10/09 23:07:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/10/09 23:07:34 WARN KafkaMicroBatchStream: Partition youtube_update-0's offset was changed from 11 to 1, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, set the source
 option "failOnDataLoss" to "true".
    
24/10/09 23:07:34 WARN KafkaMicroBatchStream: Partition youtube_update-0's offset was changed from 11 to 1, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, se