In [5]:
import matplotlib.pyplot as plt
%matplotlib inline


## Sampling a Data Stream

The TwitterStream class defined below is used to simulate a Twitter stream. It works the same way as a list, tuple or any other iterables that you may have worked with before --- you can loop over it to receive one tweet at a time. Each tweet may or may not contain emojis. There's also a helper function extract_emojis that helps you extract all emojis from a piece of text. It may be also useful to know that the variable UNICODE_EMOJI is a collection of all emojis that are circulating around the world.

In [6]:
import json
import emoji
import re

def extract_emojis(text):
    """
    Extract all emojis from a str using regular expressions
    """
    emoji_pattern = re.compile(r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F700-\U0001F77F\U0001F780-\U0001F7FF\U0001F800-\U0001F8FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FA6F\U0001FA70-\U0001FAFF\U0001F200-\U0001F251\u200D♂️\u200D♀️]+')
    return emoji_pattern.findall(text)

class TwitterStream:
    """
    Used to simulate a Twitter stream. 
    """
    
    def __init__(self, data_file):
        self.data_file = data_file
        self.data = open(self.data_file, "r")
    
    def __iter__(self):
        return self.reset()
    
    def __next__(self):
        next_line = self.data.readline()
        if next_line:
            return json.loads(next_line)["text"]
        else:
            raise StopIteration
    
    def __del__(self):
        if not self.data.closed:
            self.data.close()
    
    def reset(self):
        if not self.data.closed:
            self.data.close()
        self.data = open(self.data_file, "r")
        return self

### Random Sampling

In [7]:
from collections import defaultdict
from random import Random

# Define HistPresvRandom
class HistPresvRandom:
    """
    History-preserving Random Number Generator
    """
    
    def __init__(self, seed=None):
        self.prg = Random(seed)
        self.hist = defaultdict(list)
    
    def random(self): # works exactly like random.random()
        num = self.prg.random()
        self.hist["random"].append(num)
        return num
    
    def sample(self, population): # works exactly like random.sample(population, 1)[0]
        num = self.prg.sample(population, 1)[0]
        self.hist["sample"].append(num)
        return num
## Define random sampling algorithm
class RandomSampler:
    
    def __init__(self, in_sample_prob, seed=None):
        
        self.in_sample_prob = in_sample_prob
        self.random = HistPresvRandom(seed) # used whenever randomness is needed in your solution
        self.sample, self.counts = list(), defaultdict(int) # recommended to use defaultdict, but an ordinary dict works fine too
    
    def _process_new_item(self, item):
        """
        Applies random sampling to a newly arrived item
        """

        if self.random.random() < self.in_sample_prob:
            self.sample.append(item)
            
            for emoji in extract_emojis(item):
                self.counts[emoji] += 1
            
      
    
    def do_sampling(self, stream):
        """
        Iterates over a stream and performs random sampling
        """
        
        self.sample.clear() # clear the existing sample
        self.counts.clear() # clear the existing counts
        
        for item in stream:
            # iterate over the stream
            self._process_new_item(item)
            
            # returns a copy of sample and counts at the end of every iteration for grading - code given
            yield self.sample.copy(), self.counts.copy()

Now, let's see what the emoji distribution is after all tweets are processed

In [8]:
in_sample_prob, seed = 0.1, 42
ran_emo = RandomSampler(in_sample_prob, seed)

# Do sampling. Don't have to collect the results. Just exhaust the stream
for _ in ran_emo.do_sampling(TwitterStream("Data/tweets")):
    pass

sorted_counts = {emoji: ran_emo.counts[emoji] for emoji in sorted(ran_emo.counts.keys(), key=ran_emo.counts.get, reverse=True)}
print(sorted_counts)

{'️': 141, '😂': 34, '🥺': 19, '😍': 16, '😭': 15, '😊': 12, '😂😂😂': 11, '🤣': 10, '🔥': 9, '😭😭': 8, '🤔': 8, '😂😂': 7, '😆': 7, '💛': 7, '👍': 7, '💕': 6, '🥰': 6, '🌟': 6, '😁': 6, '💦': 6, '😎': 6, '😢': 6, '🤭': 6, '💪': 5, '😂😂😂😂': 5, '🦋': 5, '👇': 5, '🙄': 5, '🎉': 5, '😅': 5, '😘': 4, '🌸': 4, '🌺': 4, '👏': 4, '💙': 4, '🚨🚨': 4, '🗣': 4, '😳': 4, '🥳': 4, '🏅': 4, '😌': 3, '😔': 3, '🏻': 3, '💝': 3, '😩': 3, '🔃': 3, '🤾\u200d♂️': 3, '🍑': 3, '😺': 3, '🔥🔥': 3, '🤣🤣': 3, '😈': 3, '👋': 3, '👀': 3, '💗': 3, '🐰': 3, '💖': 3, '📱': 3, '😥': 3, '💭': 3, '💚': 3, '🐶': 2, '💔': 2, '🙌': 2, '🥴': 2, '😡': 2, '😱😱😱': 2, '📢': 2, '😆😆': 2, '🖤': 2, '💎': 2, '🗣️': 2, '🥵': 2, '🎀': 2, '🔒': 2, '🙂': 2, '👏🏻': 2, '🙃': 2, '😇': 2, '😃': 2, '🥺🥺🥺': 2, '👋🏻': 2, '🙏': 2, '🤩': 2, '🎈': 2, '😍😍': 2, '💥': 2, '😴': 2, '😂🤣': 2, '🤨': 2, '👉': 2, '🥺🥺': 2, '🤣🤣🤣': 2, '😷': 2, '🎵': 2, '🙏🏾': 2, '🥇': 2, '💠': 2, '💣': 2, '👏👏': 2, '🍒': 2, '🎊': 2, '💢': 2, '🔴': 2, '💃': 2, '🙇🏻': 2, '🌚': 2, '🎁': 2, '💜': 2, '👉🏽': 2, '♂': 2, '💫': 2, '🔹': 2, '🔠': 2, '💘💘💘': 1, '😔😔': 1, '😍😍😍😍😍': 1, '😙😙😙': 1, '🙏

### Reservior Sampling

In [9]:
from collections import defaultdict

class ReservoirSampler:
    
    def __init__(self, sample_size, seed=None):
        
        self.sample_size = sample_size
        self.random = HistPresvRandom(seed) # used whenever randomness is needed in your solution
        self.sample, self.counts = list(), defaultdict(int)
    
    def _process_new_item(self, item, index):
        """
        Decides whether a new item should be added to the sample and adjusts the counts accordingly
        """
        
        
        prob = self.sample_size / (index + 1)
        
        if self.random.random() <= prob:
            idx_to_remove = self.random.sample(range(self.sample_size))
            removed_item = self.sample.pop(idx_to_remove)
            
            for emoji in extract_emojis(removed_item):
                self.counts[emoji] -= 1
                if self.counts[emoji] <= 0:
                    del self.counts[emoji]
            self.sample.append(item)
            
            for emoji in extract_emojis(item):
                self.counts[emoji] += 1
     
    
    def do_sampling(self, stream):
        """
        Iterates over a stream and performs reservoir sampling
        """
        
        self.sample.clear() # clear the existing sample
        self.counts.clear() # clear the existing counts
        
        for index, item in enumerate(stream): # iterate over the stream

            
            
            if index < self.sample_size:
                self.sample.append(item)
                for emoji in extract_emojis(item):
                    self.counts[emoji] += 1
            else:
                self._process_new_item(item, index)
                    
            
            
            # returns a copy of sample and counts at the end of every iteration for grading - code given
            yield self.sample.copy(), self.counts.copy()

Let's see what the emoji distribution is after all tweets processed.

In [10]:
sample_size, seed = 100, 0
stu_ans = ReservoirSampler(sample_size, seed)

# Do sampling. Don't have to collect the results. Just exhaust the stream
for _ in stu_ans.do_sampling(TwitterStream("Data/tweets")):
    pass

sorted_counts = {emoji: stu_ans.counts[emoji] for emoji in sorted(stu_ans.counts.keys(), key=stu_ans.counts.get, reverse=True)}
print(sorted_counts)

{'️': 14, '💙': 7, '😂': 4, '🥰🥰🥰🥰': 3, '😭😭': 3, '👍': 2, '😍': 2, '👑': 2, '💕': 1, '😏': 1, '🎶': 1, '😘😘': 1, '️🎵': 1, '🥺🥺🥺🥺🥺🥰🥰🥰🥰🥰😭😭😭': 1, '😭😭😭😭': 1, '🐟': 1, '🐰': 1, '👫': 1, '👩\u200d': 1, '️\u200d💋\u200d👩': 1, '😈': 1, '🥰💚🏀🏈🎤': 1, '😠': 1, '🙃🙃': 1, '🌈': 1, '😋😋': 1, '🏼': 1, '😭😭😭😭😭😭😭😭😭😭': 1, '😷': 1, '🤦🏻\u200d♂️': 1, '😂😂😂😂': 1, '🤓': 1, '🤷🏿\u200d♂️': 1, '🔥': 1, '🐳': 1, '💰': 1, '🥇': 1, '👏🔥': 1, '👉': 1, '🏆🏆🏆': 1, '🦾': 1, '😞': 1, '💙📸': 1, '🤑': 1, '🙏🏾': 1, '️😍🙆🏽\u200d♂️': 1, '🍪': 1, '😇': 1, '😉': 1, '🥴': 1, '😩': 1, '🏯': 1, '🦅': 1, '😱': 1, '😓': 1, '👇': 1, '🤗': 1, '🥺💓': 1, '😍😍😍😍': 1, '😤😩': 1, '😭🐱🐥🍼💕': 1, '😪🥺': 1}
