In [1]:
!pip install mmh3 bitarray



In [2]:
import pickle

import mmh3
from bitarray import bitarray


class BloomFilter:
    def __init__(self, size, hash_count):
        self.bit_array = bitarray(size)
        self.bit_array.setall(0)
        self.size = size
        self.hash_count = hash_count

    def add(self, item):
        for i in range(self.hash_count):
            index = mmh3.hash(str(item), i) % self.size
            self.bit_array[index] = 1

    def __contains__(self, item):
        for i in range(self.hash_count):
            index = mmh3.hash(str(item), i) % self.size
            if self.bit_array[index] == 0:
                return False
        return True


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructField, StructType, StringType, BooleanType, IntegerType
from configs import *

In [4]:
spark = SparkSession.builder \
    .appName("WikiStreamProcessor") \
    .master("local[*]") \
    .getOrCreate()

# .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \

spark.sparkContext.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/19 16:58:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Define schema for incoming JSON data
schema = StructType([
    StructField("id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("user", StringType(), True),
    StructField("bot", BooleanType(), True),
    StructField("minor", BooleanType(), True),
    StructField("change", IntegerType(), True),
    StructField("comment", StringType(), True),
])

In [6]:
with open("../artifacts/bloom_state.pkl", "rb") as file:
    deserialized_data = pickle.load(file)

serialized_data = pickle.dumps(deserialized_data)
broadcast_bloom_filter = spark.sparkContext.broadcast(serialized_data)


def check_bloom_filter(item):
    bit_array, hash_count, size = pickle.loads(broadcast_bloom_filter.value)
    for i in range(hash_count):
        index = mmh3.hash(str(item), i) % size
        if bit_array[index] == 0:
            return False
    return True


bloom_udf = F.udf(check_bloom_filter, BooleanType())

In [7]:
if BROKER_TYPE == 'Socket':
    stream_df = (
        spark
        .readStream
        .format("socket")
        .option("host", SOCKET_HOST)
        .option("port", SOCKET_PORT)
        .load()
    )
else:
    stream_df = (
        spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BROKER)
        .option("subscribe", KAFKA_TOPIC)
        .load()
    )

In [9]:
# Parse JSON from the incoming data stream
parsed_df = (
    stream_df
    .withColumn("data", F.from_json(F.col("value"), schema))
    .select("data.*")
    .withColumn("is_bot_bloom", bloom_udf(F.col("user")))
)

metrics_df = parsed_df.withColumn(
    "tp", F.when((F.col("bot") == 1) & (F.col("is_bot_bloom") == 1), 1).otherwise(0)
).withColumn(
    "tn", F.when((F.col("bot") == 0) & (F.col("is_bot_bloom") == 0), 1).otherwise(0)
).withColumn(
    "fp", F.when((F.col("bot") == 0) & (F.col("is_bot_bloom") == 1), 1).otherwise(0)
).withColumn(
    "fn", F.when((F.col("bot") == 1) & (F.col("is_bot_bloom") == 0), 1).otherwise(0)
)

# Aggregate the metrics
agg_metrics = metrics_df.groupBy().agg(
    F.sum("tp").alias("tp"),
    F.sum("tn").alias("tn"),
    F.sum("fp").alias("fp"),
    F.sum("fn").alias("fn")
)

# Calculate precision, recall, and F1-score
final_metrics = agg_metrics.selectExpr(
    "(tp + tn) / (tp + tn + fp + fn) AS accuracy",
    "tp / (tp + fp) AS precision",
    "tp / (tp + fn) AS recall",
    "2 * (tp / (tp + fp) * tp / (tp + fn)) / (tp / (tp + fp) + tp / (tp + fn)) AS f1",
    "tp", "tn", "fp", "fn"
)

# Write the output to console
query = final_metrics.writeStream \
    .outputMode("complete") \
    .trigger(processingTime="10 second") \
    .format("console") \
    .start()

query.awaitTermination()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+---------+------+----+----+----+----+----+
|accuracy|precision|recall|  f1|  tp|  tn|  fp|  fn|
+--------+---------+------+----+----+----+----+----+
|    null|     null|  null|null|null|null|null|null|
+--------+---------+------+----+----+----+----+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+---------+------+----+---+---+---+---+
|accuracy|precision|recall|  f1| tp| tn| fp| fn|
+--------+---------+------+----+---+---+---+---+
|     1.0|     null|  null|null|  0|  4|  0|  0|
+--------+---------+------+----+---+---+---+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------+---------+------+----+---+---+---+---+
|          accuracy|precision|recall|  f1| tp| tn| fp| fn|
+------------------+---------+------+----+---+---+---+---+
|0.9285714285714286|

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/mpalamariuk/anaconda3/envs/mmds-project/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/mpalamariuk/anaconda3/envs/mmds-project/lib/python3.9/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/mpalamariuk/anaconda3/envs/mmds-project/lib/python3.9/socket.py", line 716, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [15]:

# Define test data
data = [
    {"id": 1, "text": "This is a test sentence."},
    {"id": 2, "text": "Another example of text data."},
    {"id": 3, "text": "ListeriaBot"},
    {"id": 4, "text": "Spark is great for distributed computing."}
]

# Create DataFrame
df = spark.createDataFrame(data)


df.withColumn("is_bot_bloom", bloom_udf(F.col("text"))).show()

+---+--------------------+------------+
| id|                text|is_bot_bloom|
+---+--------------------+------------+
|  1|This is a test se...|       false|
|  2|Another example o...|       false|
|  3|         ListeriaBot|        true|
|  4|Spark is great fo...|       false|
+---+--------------------+------------+

