In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import sys
import math

# Create a local StreamingContext with two working thread and batch interval of user given seconds
sc = SparkContext()

batch_interval = 10
ssc = StreamingContext(sc, batch_interval)
ssc.checkpoint("dgim")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/06/05 23:14:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
class Bucket:
    def __init__(self, ts, ones):
        self.ones = ones
        self.final_timestamp = ts

    # We use this func to merge two buckets and its ones count    
    def __add__(self, second_bucket):
        self.final_timestamp = max(self.final_timestamp, second_bucket.final_timestamp)
        self.ones += second_bucket.ones
        return self

# queue struct to merge buckets
class Queue:
    def __init__(self):
        self.buckets = [[]]

    def push(self, bucket):
        self.buckets[0].insert(0, bucket)
        self._merge_buckets()

    def _merge_buckets(self):
        for i in range(len(self.buckets)):
            if len(self.buckets[i]) > 2:
                try:
                    merged_bucket = self.buckets[i].pop() + self.buckets[i].pop()
                    self.buckets[i + 1].insert(0, merged_bucket)
                except IndexError:
                    self.buckets.append([])
                    self.buckets[i + 1].insert(0, merged_bucket)

    def evaluate(self, end_ts):
        ones = 0
        last_bucket = 0

        for bucket_group in self.buckets:
            for bucket in bucket_group:
                if bucket.final_timestamp < end_ts:
                    break
                else:
                    ones += bucket.ones
                    last_bucket = bucket.ones

        ones += math.floor(last_bucket / 2)

        return ones


# Visual improvement for development
def quiet_logging(context):
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

# http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.DStream.updateStateByKey.html
def dgim(incoming_stream, prev_stream):
    samples = []
    queue = Queue()

    # resets every new stream
    timestamp = 0
    print("IT GOT RESET!!!")
    real_number_of_ones = 0
    for elem in incoming_stream:
        if elem=='1':
            real_number_of_ones += 1
            queue.push(Bucket(timestamp, 1))

        timestamp += 1

    # Searching for same given values: []
    window_size = 10
    result = queue.evaluate(timestamp - window_size)
    samples.append((window_size, result, real_number_of_ones))
    
    return samples

def getOrderedCounts(rdd):
    # We grab the two lists, and then map them into a better struct (value, weight)
    counts_dict = rdd.flatMap(lambda x: x[1])    
    return counts_dict


if __name__ == "__main__":

    quiet_logging(sc)

    # Create a DStream that will connect to the data file given
    # We need to run the program and only then, insert the files we want to count locations into this directory
    lines = ssc.socketTextStream("localhost", 9999)

    # Split each line into pairs (Timestamp, location)
    pairs = lines.map(lambda line: line)

    # We always use the same key so we can update by key
    pre_sampled_data = pairs.map(lambda bit: (0, bit)) \

    sampled_data = pre_sampled_data.updateStateByKey(dgim)

    ordered_counts = sampled_data.transform(getOrderedCounts)

    print("\nEvaluating Queries: (window, number_of_ones)")
    ordered_counts.pprint(5)

    ssc.start()             # start the computation
    ssc.awaitTermination()  # wait for the computation to terminate


Evaluating Queries: (window, number_of_ones)


[Stage 0:>                                                          (0 + 1) / 1]

23/06/05 23:15:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:15:07 WARN BlockManager: Block input-0-1686003307400 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:15:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:15:07 WARN BlockManager: Block input-0-1686003307600 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:15:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:15:08 WARN BlockManager: Block input-0-1686003307800 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:15:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:15:08 WARN BlockManager: Block input-0-1686003308000 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:15:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:15:08 WARN BlockManager: Block input-0-1686003308200 replicated to

[Stage 0:>                                                          (0 + 1) / 1]

23/06/05 23:16:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:16:07 WARN BlockManager: Block input-0-1686003367200 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:16:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:16:07 WARN BlockManager: Block input-0-1686003367400 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:16:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:16:07 WARN BlockManager: Block input-0-1686003367600 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:16:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:16:08 WARN BlockManager: Block input-0-1686003367800 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:16:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:16:08 WARN BlockManager: Block input-0-1686003368000 replicated to

[Stage 0:>                                                          (0 + 1) / 1]

23/06/05 23:17:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:07 WARN BlockManager: Block input-0-1686003427200 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:07 WARN BlockManager: Block input-0-1686003427400 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:07 WARN BlockManager: Block input-0-1686003427600 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:08 WARN BlockManager: Block input-0-1686003427800 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:08 WARN BlockManager: Block input-0-1686003428000 replicated to

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/eduardosantos/.pyenv/versions/3.9.14/envs/third-mdle-assignment/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/eduardosantos/.pyenv/versions/3.9.14/envs/third-mdle-assignment/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/eduardosantos/.pyenv/versions/3.9.14/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


23/06/05 23:17:27 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:27 WARN BlockManager: Block input-0-1686003447200 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:27 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:27 WARN BlockManager: Block input-0-1686003447400 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:27 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:27 WARN BlockManager: Block input-0-1686003447600 replicated to only 0 peer(s) instead of 1 peers


KeyboardInterrupt: 

23/06/05 23:17:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:28 WARN BlockManager: Block input-0-1686003447800 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:28 WARN BlockManager: Block input-0-1686003448000 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:28 WARN BlockManager: Block input-0-1686003448200 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:28 WARN BlockManager: Block input-0-1686003448400 replicated to only 0 peer(s) instead of 1 peers
23/06/05 23:17:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/06/05 23:17:28 WARN BlockManager: Block input-0-1686003448600 replicated to