# Stream data

Data : English split of [OSCAR dataset](https://huggingface.co/datasets/oscar) From [Hugging Face datasets](https://huggingface.co/docs/datasets/index)

### installation

Install [Hugging Face datasets](https://huggingface.co/docs/datasets/installation)


### Data detail

name : unshuffled_deduplicated_en

Train deduplicated : **304,230,423**

Words deduplicated : **215,841,256,971**

Size deduplicated : **1.2T**

Since it is too big, we use the stream mode.


### Example input

In [1]:
from datasets import load_dataset
dataset = load_dataset('oscar', "unshuffled_deduplicated_en", split='train', streaming=True)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
shuffled_dataset = dataset.shuffle(seed=42, buffer_size=10000)

In [3]:
def traverse_dataset():
    iters = 0
    # for example in shuffled_dataset:
    #     # print(example['id'])
    #     iters += 1
    # print(iters)
    while shuffled_dataset:
        subset = shuffled_dataset.take(1000)
        shuffled_dataset = shuffled_dataset.skip(1000)
        iters += 1
        print(iters,end='\r')
    print(iters*1000)

### Task: Word Count

**Task 1**: Find the most frequent 100 words and their count. (Punctuation marks should be removed)

**Task 2**: Count the number of distinct elements.

In [4]:
import nltk
from nltk import word_tokenize
import numpy as np
from tqdm import tqdm
import time

# nltk.download()

In [5]:
def is_punctuation(word:str):
    return not (word.isalpha() or word.isnumeric())

In [17]:
def misra_greis_v0(dataset,k=100,log_interval=10000,max_iter=100000):
    log_file = './log_mg_v0.txt'
    with open(log_file,'w') as f:
        f.truncate()
    D = {}
    start = time.time()
    for idx,example in enumerate(tqdm(dataset)):
        tokens = word_tokenize(example['text'])
        
        for token in tokens:
            if is_punctuation(token):
                continue
            token = token.lower()
            
            if token in D.keys():
                D[token] = D[token] + 1
            elif len(D.keys()) < k - 1:
                D[token] = 1
            else:
                del_keys = []
                
                for key in D.keys():
                    if D[key] == 1:
                        del_keys.append(key)
                    else:
                        D[key] = D[key] - 1
                        
                for key in del_keys:
                    del D[key]
                    
        if idx % log_interval == 0:
            with open(log_file,'a') as f:
                f.write(f'Iteration {idx}: '
                        +''.join([f"{k}:{v} " for k,v in D.items()])
                        +f'Elapsed time: {time.time()-start} seconds'
                        +'\n')
        if idx == max_iter:
            break
    return D

In [18]:
def misra_greis_v1(dataset,k=1000,log_interval=1000,max_iter=100000):
    log_file = './log_mg_v1.txt'
    with open(log_file,'w') as f:
        f.truncate()
    D = {}
    iter_count = 0
    
    start = time.time()
    while iter_count < max_iter:
        examples = list(dataset.take(log_interval))

        for example in examples:
            tokens = word_tokenize(example['text'])
            
            for token in tokens:
                if is_punctuation(token):
                    continue
                token = token.lower()
                
                if token in D.keys():
                    D[token] = D[token] + 1
                elif len(D.keys()) < k - 1:
                    D[token] = 1
                else:
                    del_keys = []
                    
                    for key in D.keys():
                        if D[key] == 1:
                            del_keys.append(key)
                        else:
                            D[key] = D[key] - 1
                            
                    for key in del_keys:
                        del D[key]
                        
        with open(log_file,'a') as f:
            f.write(f'Iteration {iter_count}: '
                    +''.join([f"{k}:{v} " for k,v in D.items()])
                    +f'Elapsed time: {time.time()-start} seconds'
                    +'\n')
            
        iter_count += log_interval
        dataset = dataset.skip(log_interval)
    return D

In [29]:
def misra_greis_v2(dataset,k = 10000,showed_k=100,log_interval=1000,max_iter=1000000):

    log_file = './log_mg_v2.txt'
    with open(log_file,'w',encoding='utf-8') as f:
        f.truncate()

    D = {}
    start = time.time()
    for idx,example in enumerate(tqdm(dataset)):
        tokens = word_tokenize(example['text'])
        for token in tokens:
            if is_punctuation(token):
                continue
            token = token.lower()
                
            if token in D.keys():
                D[token] = D[token] + 1
            elif len(D.keys()) < k - 1:
                D[token] = 1
            else:
                del_keys = []
                
                for key in D.keys():
                    if D[key] == 1:
                        del_keys.append(key)
                    else:
                        D[key] = D[key] - 1
                for key in del_keys:
                     del D[key]
        
        if idx % log_interval == 0:
            with open(log_file,'a',encoding='utf-8') as f:
                _D = sorted(D.items(), key=lambda x: x[1],reverse=True)
                _D = _D[:showed_k]
                f.write(f'Iteration {idx}: '
                    +''.join([f"{k}:{v} " for k,v in _D])
                    +f'Elapsed time: {time.time()-start} seconds '
                    +f'Number of keys: {len(D.keys())}'
                    +'\n')
        if idx == max_iter:
            break
    D = sorted(D.items(), key=lambda x: x[1], reverse=True)
    D = D[:showed_k]
    return D

In [30]:
most_frequent_words = misra_greis_v2(shuffled_dataset)
# most_frequent_words = countmin_v0(shuffled_dataset)
print(most_frequent_words)

6574it [10:44, 122.04it/s]Got disconnected from remote data host. Retrying in 5sec [1/20]
100000it [22:19, 74.67it/s]

[('the', 3463451), ('and', 1979315), ('to', 1819742), ('of', 1716121), ('a', 1487039), ('in', 1197180), ('is', 813932), ('for', 729666), ('that', 664943), ('you', 600858), ('i', 600431), ('it', 559520), ('with', 550455), ('on', 488994), ('as', 406294), ('this', 405617), ('are', 391063), ('be', 366631), ('or', 316918), ('was', 312968), ('your', 303723), ('have', 302857), ('at', 302483), ('from', 292705), ('by', 292501), ('we', 285226), ('not', 253307), ('can', 241299), ('an', 236567), ('s', 233697), ('will', 229643), ('but', 219676), ('they', 197313), ('all', 193628), ('has', 181710), ('he', 181532), ('my', 174296), ('if', 173626), ('his', 172557), ('more', 170018), ('one', 169669), ('their', 159456), ('our', 158972), ('so', 153813), ('about', 147151), ('which', 141259), ('there', 137560), ('when', 133972), ('do', 132934), ('up', 126273), ('out', 125072), ('what', 122262), ('also', 120439), ('who', 119955), ('time', 117469), ('new', 115229), ('like', 110030), ('some', 106133), ('been', 




In [31]:
def get_frequent_words(dataset,k=100,max_iter=1000):
    words_all = []
    for idx, data in enumerate(dataset):
        words = word_tokenize(data['text'])
        words_all += [w.lower() for w in words if not is_punctuation(w)]
        if(idx > max_iter):
            break
    freq_dist = nltk.FreqDist(words_all)
    return [hh[0] for hh in freq_dist.most_common(k)]

In [32]:
import hashlib
import nltk
from nltk import word_tokenize
import numpy as np
#nltk.download('punkt')
h1 = hashlib.md5()
h2 = hashlib.sha1()
h3 = hashlib.sha224()
h4 = hashlib.sha256()
h5 = hashlib.sha384()
h6 = hashlib.sha512()
h7 = hashlib.sha3_224()
h8 = hashlib.sha3_256()
hash_functions = [h1,h2,h3,h4,h5,h6,h7,h8]
heavy_hitters = get_frequent_words(dataset,100)
# 由于哈希函数的不可恢复性，无法从sketch中推断出作为heavy hitters的具体元素。
# 因此只能先行统计，抽取前若干条数据为样本作为heavy hitters。
# 在使用全部数据构建sketch后，再将这些数量适配到heavy hitters中。

In [33]:
def countmin_v0(dataset,d=8,w=65536,log_interval=1000,max_iter=100000):
    '''
    For task1
    ConutMin Sketch (Cormode-Muthukrishnan) from Lec5
    :param dataset: dataset
    :param d: number of hash functions
    :param w: domain of hash functions
    :param log_interval: log interval
    :param max_iter: max stream item to process
    :return D: dictionary of top-k items
    '''
    log_file = './log_cm_v0.txt'
    with open(log_file,'w') as f:
        f.truncate()
    
    sketch = np.zeros((d,w))
    D = {}
    
    start = time.time()
    for idx,example in enumerate(tqdm(dataset)):
        tokens = word_tokenize(example['text'])
        
        for token in tokens:
            if is_punctuation(token):
                continue
            token = token.lower()
            token_bytes = token.encode('utf-8')
            
            for hash_idx,h in enumerate(hash_functions):
                h.update(token_bytes)
                h_value = int(h.hexdigest(),16)
                h_value = h_value % w
                sketch[hash_idx,h_value] += 1
            
        if idx % log_interval == 0:
            # print(f"Iteration{idx} Elapsed time: {time.time()-start} seconds")
            for word in heavy_hitters:
                word_bytes = word.encode('utf-8')
                min_count = np.inf
                
                for hash_idx,h in enumerate(hash_functions):
                    h.update(word_bytes)
                    h_value = int(h.hexdigest(),16)
                    h_value = h_value % w
                    min_count = min(min_count,sketch[hash_idx,h_value])
                D[word] = min_count
                
            with open(log_file,'a') as f:
                f.write(f'Iteration {idx}: '
                        +''.join([f"{k}:{int(v)} " for k,v in D.items()])
                        +f'Elapsed time: {time.time()-start} seconds'
                        +'\n')
        if idx == max_iter:
            break
    return D

In [34]:
def flajolet_martin_base(dataset,L=32,log_interval=1000,max_iter=100000):
    hash_function = hashlib.md5()
    for idx,example in enumerate(tqdm(dataset)):
        tokens = word_tokenize(example['text'])
        
        for token in tokens:
            if is_punctuation(token):
                continue
            token = token.lower()
            token_bytes = token.encode('utf-8')
            
            hash_function.update(token_bytes)
            h_value = int(hash_function.hexdigest(),16)
            h_value = h_value % (2**L)
            
            # 计算h_value的二进制表示中最低位的1的位置
            

In [35]:
def morris_v0(dataset,hash_function,L=32,log_interval=1000,max_iter=1000000):
    log_file = './log_mo_v0.txt'
    with open(log_file,'w') as f:
        f.truncate()
    BIG_NUMBER = 2**L
    X = 0
    PHI = 0.77351
    
    start = time.time()
    for idx,example in enumerate(tqdm(dataset)):
        tokens = word_tokenize(example['text'])
        
        for token in tokens:
            if is_punctuation(token):
                continue
            token = token.lower()
            token_bytes = token.encode('utf-8')
            
            hash_function.update(token_bytes)
            h_value = int(hash_function.hexdigest(),16)
            h_value = h_value % (BIG_NUMBER)
            
            # 计算h_value的二进制表示中最低位的1的位置
            lsb = 0
            while h_value % 2 == 0:
                lsb += 1
                h_value = h_value >> 1
                if lsb == L:
                    break
                
            X = max(X,lsb)
            
        if idx % log_interval == 0:
            esti = 2**X/PHI
            with open(log_file,'a') as f:
                f.write(f'Iteration {idx}: counting'
                        +f' {int(esti)} '
                        +f'Elapsed time: {time.time()-start} seconds'
                        +'\n')
        if idx == max_iter:
            break
    return 2**X/PHI
                

In [36]:
def morris_v1(dataset,hash_function,L=32,log_interval=1000,max_iter=1000000):
    # HyperLogLog
    # 使用了
    log_file = './log_mo_v1.txt'
    with open(log_file,'w') as f:
        f.truncate()
    BIG_NUMBER = 2**L
    X = 0
    PHI = 0.77351
    
    start = time.time()
    for idx,example in enumerate(tqdm(dataset)):
        tokens = word_tokenize(example['text'])
        
        for token in tokens:
            if is_punctuation(token):
                continue
            token = token.lower()
            token_bytes = token.encode('utf-8')
            
            hash_function.update(token_bytes)
            h_value = int(hash_function.hexdigest(),16)
            h_value = h_value % (BIG_NUMBER)
            
            # 计算h_value的二进制表示中最低位的1的位置
            lsb = 0
            while h_value % 2 == 0:
                lsb += 1
                h_value = h_value >> 1
                if lsb == L:
                    break
                
            X = max(X,lsb)
            
        if idx % log_interval == 0:
            esti = 2**X/PHI
            with open(log_file,'a') as f:
                f.write(f'Iteration {idx}: counting'
                        +f' {int(esti)} '
                        +f'Elapsed time: {time.time()-start} seconds'
                        +'\n')
        if idx == max_iter:
            break
    return 2**X/PHI

In [47]:
def morris_v2(dataset,hash_function,L=32,log_interval=1000,max_iter=1000000,m=256):
    # HyperLogLog
    # 使用了
    log_file = './log_mo_v2.txt'
    with open(log_file,'w') as f:
        f.truncate()
    BIG_NUMBER = 2**L
    X = [0]*m

    PHI = 0.77351
    
    start = time.time()
    for idx,example in enumerate(tqdm(dataset)):
        tokens = word_tokenize(example['text'])
        
        for token in tokens:
            if is_punctuation(token):
                continue
            token = token.lower()
            token_bytes = token.encode('utf-8')
            
            hash_function.update(token_bytes)
            h_value = int(hash_function.hexdigest(),16)
            h_value = h_value % (BIG_NUMBER)
            group_index = h_value % m
            # 计算h_value的二进制表示中最低位的1的位置
            lsb = 0
            h_value = int(h_value / m)
            while h_value % 2 == 0:
                lsb += 1
                h_value = h_value >> 1
                if lsb == L:
                    break
            X[group_index] = max(X[group_index],lsb)
        
        avg_X = np.nanmean(X)
        if idx % log_interval == 0:
            esti = 2**avg_X/PHI*m
            with open(log_file,'a') as f:
                f.write(f'Iteration {idx}: counting'
                        +f' {int(esti)} '
                        +f'Elapsed time: {time.time()-start} seconds'
                        +'\n')
        if idx == max_iter:
            break
    return (2**avg_X)/PHI*m

In [6]:
# MapReduce
import time
import os
from multiprocessing import Process,Manager,Pool

def data(path):  #data用来提取文档数据，汇总到列表中
    lis = []
    file = open(path,'r',encoding='utf-8')
    txt = file.readline()
    n = 1
    for txt in file:
        if n%6==4:
            content = txt.strip('\n')
            content = content.replace('<content>','') #除去无用信息
            content = content.replace('</content>','')
            if len(content)>0:
                lis.append(content)
        n+=1
    file.close()
    return lis  #所有文档汇总到列表中并返回

def Map_v0(sentences,lis):  #Map函数进行分词并存储到列表
    print('process %d start'%os.getpid())
    for snt_idx,snt in enumerate(sentences):
        words = word_tokenize(snt['text'])
        
        print('process %d working on sentence %d'%(os.getpid(),snt['id']))
        
        for wd in words:
            if not is_punctuation(wd):
                lis.append((wd,1))
                
        if snt_idx % 1000 == 0:
            print('process %d has done %d sentences'%(os.getpid(),snt_idx))
    #print(len(lis))

def Reduce_v0(lis):  #Reduce函数将结果汇总到字典中
    # print('time2 = %f'%(time.time()-start_time))  #测试Map函数总耗时（分词总耗时）
    dic = {}
    for k,v in lis:
        dic[k] = dic.get(k,0)+1
        #print(dic)
    dic_order=sorted(dic.items(),key=lambda x:x[1],reverse=True)  #字典降序排序
    with open('log_mr_v0.txt','a',encoding='utf-8') as file:
        for k,v in dic_order:
            file.write(k+':'+str(v)+'\n')  #将结果写入文件

def Map_Reduce_v0(dataset,num_workers=6,item_per_worker=10000,max_iter=100000):
    start_time = time.time()
    
    log_file = './log_mr_v0.txt'
    with open(log_file,'w') as f:
        f.truncate()

    plist = []
    # 用Pool改写一下，别用list of Process了
    # pool = Pool(num_workers)
    m = Manager()
    managed_list = m.list([])
    
    print('time1 (Initialize) = %f'%(time.time()-start_time))  #测试提取文档用时
    
    for i in range(max_iter//item_per_worker):   #创建进程
        sample = dataset.take(item_per_worker)
        p = Process(target=Map_v0,args=(sample,managed_list))
        plist.append(p)
        # pool.apply_async(Map_v0,args=(sample,managed_list))
        dataset = dataset.skip(item_per_worker)
        print('time (Initialize process %d) = %f'%(i,time.time()-start_time))
    # pool.close()
    # pool.join()  
    for p in plist:
        p.start()  #启动进程
    for p in plist:
        p.join()  #阻滞主进程
    print('time2 (Map) = %f'%(time.time()-start_time))  #测试Map函数总耗时
    
    Reduce_v0(managed_list) #当Map进程全部完成之后Reduce进行结果归约
    
    print('time3 (Reduce) = %f'%(time.time() - start_time))   #测试总用时


In [7]:
# most_frequent_words = misra_greis_v2(shuffled_dataset,100)
# most_frequent_words = countmin_v0(shuffled_dataset)
Map_Reduce_v0(shuffled_dataset)
# print(most_frequent_words)

time1 (Initialize) = 0.131585
time (Initialize process 0) = 0.131585
time (Initialize process 1) = 0.132582
time (Initialize process 2) = 0.132582
time (Initialize process 3) = 0.132582
time (Initialize process 4) = 0.134089
time (Initialize process 5) = 0.134089
time (Initialize process 6) = 0.135089
time (Initialize process 7) = 0.135089
time (Initialize process 8) = 0.136086
time (Initialize process 9) = 0.136086


In [49]:
card_count = morris_v2(shuffled_dataset,hashlib.md5())
print(card_count)

280611it [44:49, 206.71it/s]Got disconnected from remote data host. Retrying in 5sec [1/20]
Got disconnected from remote data host. Retrying in 5sec [2/20]
Got disconnected from remote data host. Retrying in 5sec [3/20]
Got disconnected from remote data host. Retrying in 5sec [4/20]
Got disconnected from remote data host. Retrying in 5sec [5/20]
Got disconnected from remote data host. Retrying in 5sec [6/20]
Got disconnected from remote data host. Retrying in 5sec [7/20]
359980it [1:52:46, 142.88it/s]Got disconnected from remote data host. Retrying in 5sec [1/20]
408177it [2:13:28, 189.69it/s]Got disconnected from remote data host. Retrying in 5sec [1/20]
439365it [2:33:22, 211.50it/s]Got disconnected from remote data host. Retrying in 5sec [1/20]
460974it [3:05:35, 119.69it/s]Got disconnected from remote data host. Retrying in 5sec [1/20]
Got disconnected from remote data host. Retrying in 5sec [2/20]
Got disconnected from remote data host. Retrying in 5sec [3/20]
Got disconnected fro

ConnectionError: Server Disconnected

nltk用时：100it 44.9s

misra_grreis_v0用时：100000it [10:36, 157.13it/s]

misra_grreis_v1用时：100000it 39:18

misra_grreis的问题是只能有效统计在各句话中都出现的词汇，如果某个词在单句话内未出现就几乎一定会被移除。

countmin_v0将统计结果“一碗水端平”，很离谱，不能作为优秀的估计算法，只能反映heavy hitters量级。而且用的hash_function太多了，影响效率。

#### Requirement

Analyse the space and time complexity. Record the memory and time consumption of the program.

Here only a simple example to count the words of first 100 texts. (Punctuation marks in example are not removed)

In [None]:
import nltk
from nltk import word_tokenize
import numpy as np

words_all = []
for idx, data in enumerate(dataset):
    words = word_tokenize(data['text'])
    words_all += [w for w in words if not is_punctuation(w)]
    if(idx > 100):
        break
freq_dist = nltk.FreqDist(words_all)

In [None]:
for k in freq_dist:
    print("{} : {}".format(k,freq_dist[k]))

the : 3819
and : 2288
to : 2037
of : 1993
a : 1680
in : 1299
is : 1021
that : 845
for : 797
with : 609
on : 596
you : 560
it : 551
be : 467
are : 463
I : 448
The : 442
as : 432
or : 414
by : 404
was : 379
this : 368
have : 357
not : 328
from : 323
your : 306
at : 298
would : 294
his : 289
an : 279
we : 261
they : 246
their : 243
can : 234
more : 227
but : 225
will : 209
all : 200
one : 199
has : 198
s : 196
our : 177
which : 164
some : 154
may : 149
who : 146
its : 146
life : 146
he : 144
This : 141
out : 140
up : 139
It : 137
about : 135
if : 133
other : 126
do : 126
there : 125
had : 125
were : 125
so : 123
We : 122
time : 122
like : 121
only : 118
than : 117
how : 115
them : 112
been : 112
my : 112
any : 108
A : 107
into : 105
over : 102
what : 102
us : 102
way : 102
when : 101
also : 100
just : 100
In : 99
could : 97
t : 96
use : 95
site : 93
get : 93
new : 91
two : 90
people : 89
very : 88
even : 88
find : 85
no : 84
Earth : 82
many : 81
because : 81
me : 81
If : 80
most : 80
much