# Exercise 6: Data Streams [100 pts]

## 1. Querying Users [30 pts]

Below is the code/psuedo code outline for the process. Since there are only 6000 messages, we will process all of them. We first receive all of them first and store them in a default dict. Then we classify them based on the frequency. The result should be a printed dictionary whose keys are the frequency: 1,2,etc. The values at each key should be the query itself, therefore categorizing them based on the number of times that query has been seen.

Similar to the slide referenced in the question, we will sample 1/10th of the stream as a subset. The code creates 10 hash buckets based on the user and the timestamp. This is so that each user is partitioned fully into a single bucket. Therefore, we can answer definitively if a specific user had repeated queries in a given day because all that user's queries will be in the same bucket. This will avoid storing partial data for the users and giving incorrect information about repeated queries. This could also be scaled up by using 100 buckets insteadf (since 10 buckets is 1/10th of the data as the sampling constraint found in the context slide). 

In [None]:
from hashlib import md5
from collections import defaultdict

def hash_bucket(user, query, timestamp, num_buckets=10):
    # Optional: convert timestamp to day granularity
    day = timestamp.split("T")[0]  # '2025-04-05T13:55:00' -> '2025-04-05'
    key = f"{user}:{day}"  # sample by user per day
    bucket = int(md5(key.encode()).hexdigest(), 16) % num_buckets
    return bucket

# Storage for 1/10th of stream
stored_queries = defaultdict(lambda: defaultdict(int))  # user -> (query -> count)

# Simulated stream
for ev in dispatcher.launch():
    user = ev["user"]
    query = ev["msg"]
    timestamp = ev["timestamp"]

    if hash_bucket(user, query, timestamp) == 0:  # Keep only bucket 0
        stored_queries[(user, timestamp.split("T")[0])][query] += 1


To get information about the number of queries for a specific user, something along the lines of the following code would be run.

In [None]:
for (user, day), queries in stored_queries.items():
    for query, count in queries.items():
        print(f"{user} ran query '{query}' {count} time(s) on {day}")

## 2. Bloom Filter [40 pts]

### [10 pts] Create a Bloom Filter, approximately 1000-2000 bits in size, for detecting bad words (i.e., AFINN of -4 or -5). It should be designed to run in Spark.

In [None]:
import base64
import hashlib

class BloomFilter:
    def __init__(self, size=2048, num_hashes=3):
        self.size = size
        self.num_hashes = num_hashes
        self.bit_vector = [0] * size

    def _hashes(self, word):
        hashes = []
        for i in range(self.num_hashes):
            hash_digest = hashlib.md5(f"{word}_{i}".encode()).hexdigest()
            index = int(hash_digest, 16) % self.size
            hashes.append(index)
        return hashes

    def add(self, word):
        for idx in self._hashes(word):
            self.bit_vector[idx] = 1

    def check(self, word):
        return all(self.bit_vector[idx] for idx in self._hashes(word))

    def to_base64(self):
        # Pack bits into bytes
        bitstring = ''.join(map(str, self.bit_vector))
        byte_array = bytearray(int(bitstring[i:i+8], 2) for i in range(0, len(bitstring), 8))
        return base64.b64encode(byte_array).decode()

    # === Step 1: Load bad words ===
with open('bad_words.txt') as f:  # You said you have this list now
    bad_words = [line.strip().lower() for line in f if line.strip()]

# === Step 2: Build Bloom Filter ===
bloom = BloomFilter()
for word in bad_words:
    bloom.add(word)

# === Step 3: Save Base64 bit vector to HDFS or local file ===
bitstring = bloom.to_base64()
with open('bloomfilter_base64.txt', 'w') as f:
    f.write(bitstring)

print("Bloom filter created and written to bloomfilter_base64.txt")

### [10 pts] The bit vector should be placed in HDFS  as a Base64-encoded text file and loaded into Spark from HDFS.

I ran the program and then the following command to put it in HDFS:

        hdfs dfs -put bloomfilter_base64.txt /user/brian_farrell/bloomfilter_base64.txt


### [15 pts] Integrate the Bloom Filter into Spark such that every arriving sentence is examined and passed along if none of the words in the sentence are bad words. Sentences that do contain bad words should be suppressed.

# Work in Progress:

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import base64

# Initialize Spark
sc = SparkContext(appName="DrunkSpeechFilter")
ssc = StreamingContext(sc, 1)

# Load Bloom filter from HDFS
b64_string = sc.textFile("hdfs:///user/yourname/bloomfilter.b64").collect()[0]
bit_array = list(bin(int.from_bytes(base64.b64decode(b64_string), 'big'))[2:].zfill(2048))
bit_vector = list(map(int, bit_array))

# Reconstruct BloomFilter logic
import hashlib

def get_hashes(word, size=2048, num_hashes=3):
    return [int(hashlib.md5(f"{word}_{i}".encode()).hexdigest(), 16) % size for i in range(num_hashes)]

def is_bad(word):
    return all(bit_vector[idx] for idx in get_hashes(word.lower()))

def contains_bad_words(sentence):
    return any(is_bad(word) for word in sentence.split())

# Simulated DStream (you could replace with socketTextStream for real input)
lines = ssc.socketTextStream("localhost", 9999)

# Filter clean sentences
clean_sentences = lines.filter(lambda sentence: not contains_bad_words(sentence))

# Output
clean_sentences.pprint()

ssc.start()
ssc.awaitTermination()

In [None]:
"""Quiz6Q2Listener.py"""

import sys, time

import pyspark
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

import math
import mmh3
from bitarray import bitarray
import base64

class BloomFilterAbridged:
    """An abridged version of the BloomFilter class I previously built.
       Can reconstruct the bit array from a Base64-encoded bit string.
    """
    def __init__(self, num_items, false_positive_prob):

        # Create a Bloom Filter approximately 1000-2000 bits in size
        self.size = 1500

        # Calculate optimal number of hash functions to use
        self.hash_count = self.det_optimal_hash_count(self.size, num_items)

        # Initialize bit array of all zeroes
        self.bit_array = bitarray(self.size)
        self.bit_array.setall(0)

        self.false_positive_prob = false_positive_prob  # false positive probability

    @classmethod
    def from_base64(cls, base64_str, num_items, false_positive_prob):
        """
        Create a bloom filter with a bit array that is encoded in a Base64 bit
        string.

        Parameters:
            base64_str : str
                The Base64-encoded bit string that the bit vector is encoded as.
            num_items : int
                Number of items expected to be stored in bloom filter
            false_positive_prob : float
                False Positive probability in decimal

        Returns:
            BloomFilter object with loaded bit array
        """

        # Create empty bloom filter
        bloom = cls(num_items, false_positive_prob)

        # Decode Base64 to bytes
        byte_array = base64.b64decode(base64_str.encode('utf-8'))

        # Convert bytes to bitarray
        bloom.bit_array = bitarray()
        bloom.bit_array.frombytes(byte_array)
        bloom.bit_array = bloom.bit_array[:1500]

        # Return the filter
        return bloom

    def check(self, item):
        """Check for existence of an item in filter
        """
        for i in range(self.hash_count):

            # Calculate the same hash positions as done in add()
            digest = mmh3.hash(item, i) % self.size

            # If any bit is not set, the item definitely doesn't exist
            if self.bit_array[digest] == False:
                return False

        # If all bits were set, the item might exist
        return True

    def det_optimal_hash_count(self, m, n):
        """Calculates the optimal number of hash functions using k = (m/n) * lg(2)

        This formula is from: https://en.wikipedia.org/wiki/Bloom_filter

            m : int
                size of bit array
            n : int
                number of items expected to be stored in filter
        """
        k = (m/n) * math.log(2)
        return int(k)

    def get_bit_array(self):
        return self.bit_array


def setLogLevel(sc, level):
    from pyspark.sql import SparkSession
    spark = SparkSession(sc)
    spark.sparkContext.setLogLevel(level)


if __name__ == "__main__":

    if len(sys.argv) != 3:
        print("Usage: bad_word_listener.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)

    print ('Argv', sys.argv)

    host = sys.argv[1]
    port = int(sys.argv[2])
    print ('host', type(host), host, 'port', type(port), port)

    sc_bak = SparkContext.getOrCreate()
    sc_bak.stop()

    time.sleep(15)
    print ('Ready to work!')

    ctx = pyspark.SparkContext(appName = "Netcat Wordcount", master="local[*]")
    print ('Context', ctx)

    spark = SparkSession(ctx).builder.getOrCreate()
    sc = spark.sparkContext

    setLogLevel(sc, "WARN")

    print ('Session:', spark)
    print ('SparkContext', sc)

    # sc = SparkContext(conf=conf)

    # Create DataFrame representing the stream of input lines from connection to host:port
    lines = spark\
        .readStream\
        .format('socket')\
        .option('host', host)\
        .option('port', port)\
        .load()

    # Read the Base64-encoded bit string from the Base64-encoded text file and
    # broadcast it.
    file_df = spark.read.text('/user/brian_farrell/bloomfilter_base64.txt')
    base64_str = file_df.first()['value']
    broadcast_filter = spark.sparkContext.broadcast(base64_str)

    def check_line_for_bad_words(line):
        """Check if any of the words in a line are bad.
        """
        # Create Bloom filter with the bit array encoded in the broadcasted
        # Base64 string.
        bf = BloomFilterAbridged.from_base64(broadcast_filter.value,
                                            num_items=63,
                                            false_positive_prob=0.05)

        # Split the line into words
        words = line.split()

        # Return True if all words are good (no bad words found)
        return all(not bf.check(word) for word in words)

    # Register the user defined function
    is_clean_line = udf(check_line_for_bad_words, BooleanType())

    # Filter lines to only keep those without bad words
    clean_lines = lines.filter(is_clean_line(lines.value))

    # Start running the query that prints the clean lines to the console
    query = clean_lines\
        .writeStream\
        .outputMode('append')\
        .format('console')\
        .option("truncate", False)\
        .option("numRows", 100000)\
        .option("separator", "")\
        .option("header", False)\
        .start()

    query.awaitTermination()

[Demo](https://tufts.zoom.us/rec/share/PHLhSyKuWtj2wErp890cV_zZrr_phiRXFMEfZZcFedOjXkDBw3JCotma7a9h79aa.nn_9dtpICC560Wg_?startTime=1744123581000)

## 3. Counting Unique Users [30 pts]

### Modify read_stdin.py to implement the HyperLogLog algorithm. Increase the number of senders and decrease the (μ,σ) of the delay between queries until the receiver can no longer keep up! Draw a graph of the estimated number of users as a function of elapsed time.


The modified click-feeder.py program is below that significantly increases the users used to cause the std-in to not be able to keep up. 

In [None]:
#!/usr/bin/env python3

import random
import sys
import time

# Configuration: High number of users, low delay
NUM_USERS = 10000               # Total users to simulate
NUM_QUERIES = 100              # Unique queries each user might send
TOTAL_EVENTS = 10000000         # Total events to emit (set high for "infinite" effect)
MEAN_DELAY = 0.01              # Mean delay between events (seconds)
STD_DEV_DELAY = 0.003          # Standard deviation of delay

# Create list of user names and queries
user_ids = [f"user{str(i).zfill(5)}" for i in range(NUM_USERS)]
query_templates = [f"query_{i}" for i in range(NUM_QUERIES)]

# Emit events
for _ in range(TOTAL_EVENTS):
    user = random.choice(user_ids)
    query = random.choice(query_templates)
    timestamp = int(time.time())

    # Output format: <user>\t<query>\t<timestamp>
    print(f"{user}\t{query}\t{timestamp}", flush=True)

    # Control the delay between events
    delay = max(0, random.gauss(MEAN_DELAY, STD_DEV_DELAY))
    time.sleep(delay)


The updated read_stdin.py using hyperloglog is below. 

In [None]:
#!/usr/bin/env python3

import sys
import time
import hyperloglog

# Initialize HyperLogLog with error rate ~1% (~1KB memory usage)
hll = hyperloglog.HyperLogLog(0.01)

# Track time
start_time = time.time()
interval = 1  # seconds between measurements
next_tick = start_time + interval

# Optional: for logging (to create a graph later if desired)
log = []

print("Elapsed(s)\tEstimated Unique Users", flush=True)

try:
    for line in sys.stdin:
        try:
            user, query, timestamp = line.strip().split('\t')
        except ValueError:
            continue  # skip malformed lines

        hll.add(user)

        now = time.time()
        if now >= next_tick:
            elapsed = int(now - start_time)
            est_count = len(hll)
            print(f"{elapsed}\t{est_count}", flush=True)
            log.append((elapsed, est_count))
            next_tick += interval

except KeyboardInterrupt:
    print("Streaming stopped by user.")
    sys.exit(0)


Command: python3 click-feeder.py | python3 read_stdin.py > log.txt

I ran the above command for 60 seconds and stored the output in log.txt:

            Elapsed(s)	Estimated Unique Users
            1	87
            2	169
            3	244
            4	320
            5	388
            6	451
            7	514
            8	579
            9	637
            10	687
            11	739
            12	796
            13	848
            14	896
            15	934
            16	977
            17	1016
            18	1054
            19	1096
            20	1131
            21	1177
            22	1212
            23	1239
            24	1257
            25	1293
            26	1320
            27	1352
            28	1379
            29	1398
            30	1429
            31	1454
            32	1470
            33	1494
            34	1514
            35	1534
            36	1551
            37	1564
            38	1589
            39	1608
            40	1627
            41	1637
            42	1654
            43	1666
            44	1678
            45	1688
            46	1704
            47	1715
            48	1721
            49	1732
            50	1748
            51	1759
            52	1771
            53	1778
            54	1787
            55	1794
            56	1802
            57	1806
            58	1809
            59	1815
            60	1820
            Streaming stopped by user.



I used the following code below to graph the results

In [None]:
import matplotlib.pyplot as plt

# Updated data from the second run
elapsed = list(range(1, 61))
estimated_users = [
    87, 169, 244, 320, 388, 451, 514, 579, 637, 687,
    739, 796, 848, 896, 934, 977, 1016, 1054, 1096, 1131,
    1177, 1212, 1239, 1257, 1293, 1320, 1352, 1379, 1398, 1429,
    1454, 1470, 1494, 1514, 1534, 1551, 1564, 1589, 1608, 1627,
    1637, 1654, 1666, 1678, 1688, 1704, 1715, 1721, 1732, 1748,
    1759, 1771, 1778, 1787, 1794, 1802, 1806, 1809, 1815, 1820
]

# Plotting the estimated unique users over time
plt.figure(figsize=(10, 6))
plt.plot(elapsed, estimated_users, marker='o', color='blue', linestyle='-')
plt.title('Estimated Unique Users Over Time')
plt.xlabel('Elapsed Time (seconds)')
plt.ylabel('Estimated Unique Users')
plt.grid(True)
plt.tight_layout()

plt.show()


# ![title](plot2.png)

As you can see from the above graph and output, the read_stdin starts to struggle keeping up with the click-feeder.py around 50 seconds. Prior to the 50 second mark, it is increasing linearly, but then plateaus. I used 10,000 simulated users to ensure it would not reach the correct number. 