# Practice Session 08: Data streams

In this session we will take a large corpus of documents and compute some statistics using data streams methods.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

Author: <font color="blue">Luca Franceschi</font>

E-mail: <font color="blue">luca.franceschi01@estudiant.upf.edu</font>

Date: <font color="blue">12/11/2024</font>

In [None]:
import io
import nltk
import gzip
import random
import statistics
import secrets
import re
import gzip

In [None]:
# nltk.download('punkt_tab')
# nltk.download('averaged_perceptron_tagger_eng')

# 0. Dataset and how to iterate

The input file contain lines of dialogue of a set of movies from the [Movie Dialog Corpus](https://www.kaggle.com/datasets/Cornell-University/movie-dialog-corpus). We will use the file `movie_lines.tsv` which contains the text of the dialogue, about 3 million words in about 300,000 lines of dialogue.

During this practice, **we will never load this file in memory.**

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [None]:
# Leave this code as-is

INPUT_FILE = 'movie_lines.tsv.gz'

The function `read_by_words` is a [generator](https://wiki.python.org/moin/Generators), that is, a function that behaves as an iterator. This is a common pattern used in stream processing, and in Python is implemented with the `yield` keyword, instead of `return`.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [None]:
# Leave this code as-is

POS_NOUN = 'NN'
POS_VERB = 'VB'
POS_ADJECTIVE = 'JJ'


# Producer in Python that reads a file by words that are nouns
def read_by_parts_of_speech(filename, parts_of_speech, max_words=-1, report_every=-1):

    # Open the input file
    with gzip.open(INPUT_FILE, 'rt', encoding='utf8') as file:

        # Initialize counter of words to stop at max_words
        counter = 0

        # Iterate through lines in the file
        for line in file:

            elements = line.split('\t')

            text = ""
            if len(elements) >= 5:
                text = elements[4].strip()

            if counter > max_words and max_words != -1:
                break

            for sentence in nltk.sent_tokenize(text):

                tagged = nltk.pos_tag(nltk.word_tokenize(sentence))
                for word in [part[0] for part in tagged if part[1] in parts_of_speech]:

                    counter += 1

                    # Report
                    if (report_every != -1) and (counter % report_every == 0):
                        if max_words == -1:
                            print('- Read %d words so far' % (counter))
                        else:
                            print('- Read %d/%d words so far' % (counter, max_words))

                    # Produce the word in lowercase
                    yield word.lower()

We will do a first pass over the data. Here we will read only the first 30K nouns. Try with a larger limit if your computer is fast, with a lower limit if your computer is slow. Find something that makes one pass take about 30 seconds and use it for development.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [None]:
for word in read_by_parts_of_speech(
    INPUT_FILE, [POS_ADJECTIVE], max_words=30000, report_every=10000
):
    # Prints 1/1000 of words
    if random.random() < 0.001:
        print('Current noun \'%s\'' % (word))

**Tip:** NLTK may complain that you have some missing files. The following commands may help:

```python3
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger_eng')
```

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

# 1. Determine approximately the top-10 words

Instead of loading the entire dataset in main memory, we will use reservoir sampling to determine approximately the top-10 words.

**Reservoir sampling**: In reservoir sampling, if we have a reservoir of size S:

* We store the first S elements of the stream
* When the n<sup>th</sup> element arrives (let's call it X<sub>n</sub>):
   * With probability 1 - s/n, we ignore this element.
   * With probability s/n, we:
      * Discard a random element from the reservoir
      * Add element X<sub>n</sub> to the reservoir (calling *add_to_reservoir*)
      
<font size="-1" color="gray">(Remove this cell when delivering.)</font>

Implement a function `add_reservoir(reservoir, item, max_size)` that adds an item to the reservoir, maintaining its size. If the reservoir is already of size *max_size*, a random item is selected and evicted *before* adding the item. It is important to evict an old item *before* adding the new item. Use the following skeleton:

```python
def add_to_reservoir(reservoir, item, max_reservoir_size):
    # YOUR CODE HERE
    assert(len(reservoir) <= max_reservoir_size)
```

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

<font size="+1" color="red">Replace this cell with your code for "add_reservoir"</font>

In [None]:
def add_to_reservoir(reservoir: list, item, max_reservoir_size):
    if len(reservoir) == max_reservoir_size:
        if random.random() < 1 / max_reservoir_size:
            reservoir[random.choice(range(max_reservoir_size))] = item
    else:
        reservoir.append(item)
    assert len(reservoir) <= max_reservoir_size

Create a function to iterate through the file using the reservoir sampling method seen in class. In this function you will decide, for every item, whether to call *add_to_reservoir* or to ignore the item.

You can use the following skeleton:

```python
def reservoir_sampling(filename, reservoir_size, max_words=-1, report_every=-1):
    reservoir = []
    
    words_read = 0
    
    for word in read_by_parts_of_speech(filename, max_words=max_words, report_every=report_every):
    
            # YOUR CODE HERE

    return (words_read, reservoir)
```

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

<font size="+1" color="red">Replace this cell with your code for "reservoir_sampling"</font>

In [None]:
def reservoir_sampling(
    filename, parts_of_speech, reservoir_size, max_words=-1, report_every=-1
):
    reservoir = []

    words_read = 0

    for word in read_by_parts_of_speech(
        filename,
        max_words=max_words,
        report_every=report_every,
        parts_of_speech=parts_of_speech,
    ):

        add_to_reservoir(reservoir, word, reservoir_size)
        words_read += 1

    return (words_read, reservoir)

Test your function using the following code:

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [None]:
# Leave this code as-is

reservoir_size = 1500
(items_seen, reservoir) = reservoir_sampling(
    INPUT_FILE, [POS_ADJECTIVE], reservoir_size, max_words=30000, report_every=10000
)

print('Number of items seen    : %d' % items_seen)
print('Number of items sampled : %d' % len(reservoir))

The reservoir contains repeated items. You can compute the absolute frequencies of the top 20 using the following code.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [None]:
# Leave this code as-is

freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)

most_frequent_items = sorted(
    [(frequency, word) for word, frequency in freq.items()], reverse=True
)[:20]

for absolute_frequency, word in most_frequent_items:
    print('%d %s' % (absolute_frequency, word))

Write code to compute the 20 most frequent items in the reservoir and their relative frequencies, as percentages.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

<font size="+1" color="red">Replace this cell with your code to print the top items and their relative frequencies</font>

In [None]:
for absolute_frequency, word in most_frequent_items:
    print(
        '{} {} ({:.2%})'.format(
            absolute_frequency, word, absolute_frequency / reservoir_size
        )
    )

If you see an item C times in the reservoir, you can estimate the item appears *C x dataset_size / reservoir_size* times in the entire dataset (*dataset_size* is the size of the entire dataset). 

For various sizes of the reservoir, e.g., 50, 100, 500, ..., list the top-5 words and your estimate of their frequency in the entire dataset.
 
<font size="-1" color="gray">(Remove this cell when delivering.)</font>

<font size="+1" color="red">Increase the max limit of words so that one pass takes about 2-3 minutes to be completed. Replace this cell with your code to try different reservoir sizes. In each case, print your estimate for the relative and absolute frequency of the words in the entire dataset.</font>

In [None]:
import time

In [None]:
sizes = [50, 100, 300, 500, 1000, 1500, 2999, 3000, 3001, 6000, 12000, 15000, 30000]

for reservoir_size in sizes:
    print(f'Reservoir size = {reservoir_size} ({reservoir_size/30000:.2%})')
    start = time.time()
    (items_seen, reservoir) = reservoir_sampling(
        INPUT_FILE, [POS_ADJECTIVE], reservoir_size, max_words=30000, report_every=10000
    )

    freq = {}
    for item in reservoir:
        freq[item] = reservoir.count(item)
    most_frequent_items = sorted(
        [(frequency, word) for word, frequency in freq.items()], reverse=True
    )[:20]

    print(
        '{:<19s} \u2502 {:<8s} \u2502 {:<8s} \u2502 {:<19s}'.format(
            'word', 'abs_freq', 'rel_freq', 'E(abs_freq|dataset)'
        )
    )
    print(
        '{:\u2500<20s}\u253C{:\u2500<10s}\u253C{:\u2500<10s}\u253C{:\u2500<21s}'.format(
            '', '', '', ''
        )
    )
    for absolute_frequency, word in most_frequent_items:
        print(
            '{:<19s} \u2502 {:^8d} \u2502 {:^8.2%} \u2502 {:^20d}'.format(
                word,
                absolute_frequency,
                absolute_frequency / reservoir_size,
                round(absolute_frequency * items_seen / reservoir_size),
            )
        )

    end = time.time()
    print(
        '========================= TIME: {:.2f}s ========================='.format(
            end - start
        )
    )

Find by trial and error, and include in your report, the minimum reservoir size you need to have somewhat stable results (e.g., the same top-3 words in two consecutive runs of the algorithm).

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

<font size="+1" color="red">Replace this cell with a brief commentary indicating what reservoir size you would recommend to use, and your overall conclusions.</font>

With reservoir size around 3000 the results tend to stabilize.

TODO: finish

# 2. Determine approximately the distinct number of words

We will estimate the number of distinct words without creating a dictionary or hash table, but instead, we will use the Flajolet-Martin probabilistic counting method.

**Flajolet-Martin probabilistic counting**:

* For several passes
   * Create hash funcion h
   * For every element *u* in the stream:
      * Compute hash value *h(u)*
      * Let *r(u)* be the number of trailing zeroes in *h(u)*
      * Maintain *R* as the maximum value of *r(u)* seen so far
   * Add *2<sup>R</sup>* as an estimate for the number of distinct elements *u* seen
* The final estimate is the average or the median of the estimates found in each pass

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

Use this function to count trailing zeroes in the binary representation of a number.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [None]:
# Leave this code as-is


def count_trailing_zeroes(number):
    count = 0
    while number & 1 == 0:
        count += 1
        number = number >> 1
    return count

Use this function to generate a random hash function. Note this generates a function, so you can do `hash_function = random_hash_function()` and then call `hash_function(x)` to compute the hash value of `x`. 

We want to make sure each hash is different, so we will create each hash function with a different [salt](https://en.wikipedia.org/wiki/Salt_(cryptography)), which is an additional input that we will take using a good random string generator from the [secrets](https://docs.python.org/3/library/secrets.html) library.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [None]:
# Leave this code as-is


def random_hash_function():
    # We use a cryptographically safe generator for the salt of our hash function
    salt = secrets.token_bytes(32)
    return lambda string: hash(string + str(salt))

Perform *number_of_passes* passes over the file, reading the entire file on each pass (we don't use the reservoir in this part). In each pass, create a new hash function and use it to hash userids. Keep the maximum number of trailing zeroes seen in the hash value of a userid. 

```python
number_of_passes = 5
estimates = []

for i in range(number_of_passes):
    # YOUR_CODE_HERE: read the file and generate an estimate
    
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
```

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

<font size="+1" color="red">Replace this cell with your code to perform the requested number of passes.</font>

In [None]:
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

You can increase the limit of words to read (but do not use more than 5 minutes of computing time), and perform the 10 passes. 

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

<font size="+1" color="red">Compute the median of average estimates in 3 separate runs of your algorithm; each run should do 10 passes over the file. Repeat this for nouns (POS_NOUN), adjectives (POS_ADJECTIVE), and verbs (POS_VERB). Replace this cell with the results you obtained in each pass, and whether the average or the median seem more appropriate for this probabilistic counting.</font>

# DELIVER (individually)

Remember to read the section on "delivering your code" in the [course evaluation guidelines](https://github.com/chatox/data-mining-course/blob/master/upf/upf-evaluation.md).

Deliver a zip file containing:

* This notebook

## Extra points available

For more learning and extra points, notice that the number of **distinct** words in a corpus, as a function of the **total** number of words in the corpus, follows an empirical law known as [Heap's Law](https://en.wikipedia.org/wiki/Heaps%27_law).

Repeat the probabilistic counting experiment for various values of `max_word` and plot the total number of words read versus the number of distinct words (remember to label axes). Check if it follows Heap's law for nouns, verbs, and adjectives.

Please note that using probabilistic counting means a substantial amount of noise will be introduced and perhaps the Heap's law will not be clear in your plot.

**Note:** if you go for the extra points, add ``<font size="+2" color="blue">Additional results: Heap's law</font>`` at the top of your notebook. 

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

<font size="+2" color="#003300">I hereby declare that, except for the code provided by the course instructors, all of my code, report, and figures were produced by myself.</font>