In [1]:
import dask.bag as bag
import os
from dask import array as dask_array

In [2]:
raw_text = bag.read_text("foods.txt",encoding='cp1252')

In [3]:
raw_text.count().compute()

5116093

In [4]:
from dask.delayed import delayed

In [5]:
def get_next_buffer_part(file,start_index,span_index=0,blocksize=1000):
    file.seek(start_index)
    buffer = file.read(blocksize + span_index).decode('cp1252')
    delimeter_position = buffer.find('\n\n')
    if delimeter_position == -1:
        return get_next_buffer_part(file,start_index,span_index+blocksize)
    else:
        file.seek(start_index)
        return start_index,delimeter_position

In [6]:
with open("foods.txt","rb") as file_handle:
    size = file_handle.seek(0,2) - 1       #Get the total size of the file in bytes
    more_data = True                     
    output = list()
    current_position = next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
        else:
            current_position,next_position = get_next_buffer_part(file_handle,current_position,0)
            output.append((current_position,next_position))
            current_position = current_position + next_position + 2

In [7]:
def get_dict_item(filename,start_index,delimeter_position,encoding='cp1252'):
    with open(filename,"rb") as file_handle:
        file_handle.seek(start_index)
        text = file_handle.read(delimeter_position).decode(encoding)
        elements = text.strip().split("\n")
        key_value_pairs = [(element.split(": ")[0], element.split(": ")[1])
                          if len(element.split(": ")) > 1
                          else ("unknown",element)
                          for element in elements]
        return dict(key_value_pairs)

In [8]:
reviews = bag.from_sequence(output).map(lambda x: get_dict_item("foods.txt",x[0],x[1]))

In [9]:
def fetch_scores(element):
    numeric_score = float(element['review/score'])
    return numeric_score

In [10]:
review_scores = reviews.map(fetch_scores)

In [11]:
def tag_reviews(element):
    if float(element['review/score']) > 3:
        element['review/score'] = 'pos'
    else:
        element['review/score'] = 'neg'
    return element

In [12]:
reviews = reviews.map(tag_reviews)

In [13]:
import pandas as pd
import numpy as np
from nltk.tokenize import word_tokenize
import re
import nltk
from contractions import contractions_dict
from nltk.corpus import stopwords
from spacy.lang.en.stop_words import STOP_WORDS
from itertools import filterfalse
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet as wn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA

In [14]:
def text_tokenization(x):
    x['review/text'] = word_tokenize(x['review/text'])
    return x

In [15]:
tokenized_reviews = reviews.map(text_tokenization)

In [16]:
def normalize_tokens(review):
    review['review/text'] =  [x.lower() for x in review['review/text']]
    return review

In [17]:
normalized_reviews = tokenized_reviews.map(normalize_tokens)

In [18]:
def contracted_word_expansion(token):
    if token in contractions_dict.keys():
        return contractions_dict[token]
    else:
        return token

In [19]:
def contractions_expansion(review):
    review['review/text'] = list(map(contracted_word_expansion,review['review/text']))
    return review

In [20]:
contracted_reviews = normalized_reviews.map(contractions_expansion)

In [21]:
regex = r'^@[a-zA-z0-9]|^#[a-zA-Z0-9]|\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*|\W+|\d+|<("[^"]*"|\'[^\']*\'|[^\'">])*>|_+|[^\u0000-\u007f]+'

In [22]:
def waste_word_or_not(token):
    return re.search(regex,token)

In [23]:
def filter_waste_words(review):
    review['review/text'] = list(filterfalse(waste_word_or_not,review['review/text']))
    return review

In [24]:
filtered_reviews = contracted_reviews.map(filter_waste_words)

In [25]:
def split(review):
    review['review/text'] = list(map(lambda x: re.split(regex,x)[0],review['review/text']))
    return review

In [26]:
filtered_reviews = filtered_reviews.map(split)

In [27]:
en_stop_words = list(set(stopwords.words('english')).union(set(STOP_WORDS)))

In [28]:
def is_stopword(token):
    return not(token in en_stop_words or re.search(r'\b\w\b|[^\u0000-\u007f]+|_+|\W+',token))

In [29]:
def stopwords_removal(review):
    review['review/text'] = list(filter(is_stopword,review['review/text']))
    return review

In [30]:
without_stopwords_reviews = filtered_reviews.map(stopwords_removal)

In [31]:
def get_wnet_pos_tag(treebank_tag):
    if treebank_tag[1].startswith('J'):
        return (treebank_tag[0],wn.ADJ)
    elif treebank_tag[1].startswith('V'):
        return (treebank_tag[0],wn.VERB)
    elif treebank_tag[1].startswith('N'):
        return (treebank_tag[0],wn.NOUN)
    elif treebank_tag[1].startswith('R'):
        return (treebank_tag[0],wn.ADV)
    else:
        return (treebank_tag[0],wn.NOUN)

In [32]:
def get_pos_tag(review):
    review['review/text'] = list(map(get_wnet_pos_tag,pos_tag(review['review/text'])))
    return review

In [33]:
tagged_reviews = without_stopwords_reviews.map(get_pos_tag)

In [34]:
lemmatizer = WordNetLemmatizer()

In [35]:
def token_lemmatization(token_pos_tuple):
    if token_pos_tuple == None:
        return ""
    else:
        return lemmatizer.lemmatize(word=token_pos_tuple[0],pos=token_pos_tuple[1])

In [36]:
def lemmatization(review):
    if len(review['review/text']) > 0:
        review['review/text'] = list(map(token_lemmatization,review['review/text']))
    else:
        review['review/text'] = [""]
    return review

In [37]:
lemmatized_reviews = tagged_reviews.map(lemmatization)

In [38]:
def extract_tokens(review):
    return review['review/text']

In [39]:
extracted_tokens = lemmatized_reviews.map(extract_tokens)

In [40]:
def count(accumulator,element):
    return accumulator + 1

In [41]:
def combine(total_1,total_2):
    return total_1 + total_2

In [42]:
from dask.diagnostics import ProgressBar

In [43]:
with ProgressBar():
    token_counts = extracted_tokens.flatten().foldby(lambda x: x,count,0,combine,0).compute()

[########################################] | 100% Completed |  7min 31.7s


In [44]:
token_counts

[('buy', 131865),
 ('vitality', 384),
 ('can', 21968),
 ('dog', 100537),
 ('food', 139285),
 ('product', 169837),
 ('find', 124971),
 ('good', 208769),
 ('quality', 32408),
 ('look', 57297),
 ('like', 264865),
 ('stew', 1541),
 ('process', 7048),
 ('meat', 13200),
 ('smell', 33022),
 ('well', 46718),
 ('labrador', 387),
 ('finicky', 1946),
 ('appreciate', 3972),
 ('arrive', 18409),
 ('labeled', 73),
 ('jumbo', 283),
 ('salt', 29083),
 ('peanut', 19640),
 ('actually', 25006),
 ('small', 41386),
 ('size', 29422),
 ('unsalted', 604),
 ('sure', 29427),
 ('error', 763),
 ('vendor', 2933),
 ('intend', 1977),
 ('represent', 366),
 ('confection', 283),
 ('century', 252),
 ('light', 18167),
 ('pillowy', 10),
 ('citrus', 1443),
 ('gelatin', 744),
 ('nuts', 1211),
 ('case', 20264),
 ('filbert', 30),
 ('cut', 11385),
 ('tiny', 5989),
 ('square', 2913),
 ('liberally', 159),
 ('coat', 7923),
 ('powdered', 653),
 ('sugar', 50763),
 ('mouthful', 315),
 ('heaven', 1927),
 ('chewy', 7175),
 ('flavorful'

In [45]:
top_tokens = sorted(token_counts,key=lambda x:x[1],reverse=True)

In [46]:
top_tokens

[('br', 565453),
 ('like', 264865),
 ('taste', 241140),
 ('good', 208769),
 ('flavor', 190207),
 ('love', 179571),
 ('product', 169837),
 ('coffee', 163682),
 ('great', 162407),
 ('try', 141294),
 ('food', 139285),
 ('tea', 138267),
 ('buy', 131865),
 ('find', 124971),
 ('dog', 100537),
 ('eat', 96046),
 ('time', 93598),
 ('order', 82135),
 ('use', 81242),
 ('price', 80536),
 ('amazon', 79442),
 ('little', 79056),
 ('bag', 78866),
 ('think', 73389),
 ('best', 73235),
 ('get', 72270),
 ('drink', 67716),
 ('store', 64778),
 ('treat', 63192),
 ('day', 61839),
 ('box', 61038),
 ('cup', 60833),
 ('come', 59708),
 ('add', 59680),
 ('cat', 59542),
 ('chocolate', 57937),
 ('look', 57297),
 ('go', 57011),
 ('water', 56158),
 ('year', 55879),
 ('brand', 54716),
 ('want', 54053),
 ('recommend', 53873),
 ('know', 52544),
 ('sugar', 50763),
 ('purchase', 49358),
 ('make', 49316),
 ('sweet', 49282),
 ('way', 48679),
 ('work', 48536),
 ('well', 46718),
 ('thing', 45992),
 ('mix', 44693),
 ('need', 44

In [47]:
top_100_tokens = list(map(lambda x:x[0],top_tokens[:100]))

In [48]:
top_100_tokens

['br',
 'like',
 'taste',
 'good',
 'flavor',
 'love',
 'product',
 'coffee',
 'great',
 'try',
 'food',
 'tea',
 'buy',
 'find',
 'dog',
 'eat',
 'time',
 'order',
 'use',
 'price',
 'amazon',
 'little',
 'bag',
 'think',
 'best',
 'get',
 'drink',
 'store',
 'treat',
 'day',
 'box',
 'cup',
 'come',
 'add',
 'cat',
 'chocolate',
 'look',
 'go',
 'water',
 'year',
 'brand',
 'want',
 'recommend',
 'know',
 'sugar',
 'purchase',
 'make',
 'sweet',
 'way',
 'work',
 'well',
 'thing',
 'mix',
 'need',
 'snack',
 'enjoy',
 'package',
 'pack',
 'small',
 'favorite',
 'chip',
 'bit',
 'lot',
 'nice',
 'give',
 'ingredient',
 'delicious',
 'bar',
 'healthy',
 'free',
 'hot',
 'stuff',
 'strong',
 'bad',
 'easy',
 'review',
 'old',
 'say',
 'right',
 'smell',
 'different',
 'quality',
 'long',
 'perfect',
 'high',
 'milk',
 'organic',
 'take',
 'month',
 'dry',
 'hard',
 'big',
 'sure',
 'size',
 'salt',
 'oil',
 'fresh',
 'green',
 'problem',
 'feel']

In [49]:
def extract_bow_vector(review):
    one_hot_encoded_bow_vector = np.where(np.isin(top_100_tokens,review['review/text']),1,0)
    review['review/text'] = one_hot_encoded_bow_vector
    return review

In [50]:
def prep_model_data(review):
    return {'label' : 1 if review['review/score'] == 'pos' else 0, 'bow_vector':review['review/text']}

In [51]:
model_data = lemmatized_reviews.map(extract_bow_vector).map(prep_model_data)

In [52]:
model_data

dask.bag<prep_model_data, npartitions=101>

In [53]:
model_data.take(1)

({'label': 1,
  'bow_vector': array([0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])},)

In [54]:
def partition_elements_stacker(partition_content):
    partition_list = list()
    for single_element in partition_content:
        partition_list.append(single_element)
    return dask_array.concatenate(partition_list)

In [55]:
with ProgressBar():
    bow_matrix = model_data.pluck('bow_vector').map(lambda x: dask_array.from_array(x).reshape(1,-1)).reduction(
                perpartition=partition_elements_stacker,aggregate=partition_elements_stacker)
    bow_matrix = bow_matrix.compute()

[########################################] | 100% Completed | 11min  9.0s


In [57]:
bow_matrix

Unnamed: 0,Array,Chunk
Bytes,216.85 MiB,400 B
Shape,"(568454, 100)","(1, 100)"
Count,2992432 Tasks,568454 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 216.85 MiB 400 B Shape (568454, 100) (1, 100) Count 2992432 Tasks 568454 Chunks Type int32 numpy.ndarray",100  568454,

Unnamed: 0,Array,Chunk
Bytes,216.85 MiB,400 B
Shape,"(568454, 100)","(1, 100)"
Count,2992432 Tasks,568454 Chunks
Type,int32,numpy.ndarray


In [59]:
with ProgressBar():
    bow_matrix.to_zarr('bow_matrix.zarr')

[########################################] | 100% Completed | 12min 33.5s


In [None]:
with ProgressBar():
    labels_array = model_data.pluck('review/score').map(lambda x: dask_array.from_array(x).reshape(-1,1)).reduction(
                perpartition=partition_elements_stacker,aggregate=partition_elements_stacker)
    labels_array = labels_array.compute()

In [None]:
with ProgressBar():
    labels_array.to_zarr('labels_array.zarr')