In [1]:
%pip install "dask[complete]" #installing dask



In [2]:
import dask.bag as bag #for lazy operations import daskbags
import os
from dask.diagnostics import ProgressBar # to see the progress of the process

In [3]:
raw_text=bag.read_text("/content/drive/MyDrive/foods.txt",encoding='latin1') #read the data

In [4]:
raw_text.count().compute()

5116093

In [5]:
from dask.delayed import delayed

In [6]:
def get_next_buffer_part(file,start_index,span_index=0,blocksize=1000):
    file.seek(start_index)
    buffer=file.read(blocksize+span_index).decode('latin1')
    delimeter_position=buffer.find("\n\n")
    if delimeter_position==-1:
        return get_next_buffer_part(file,start_index,span_index+blocksize)
    else:
        file.seek(start_index)
        return start_index,delimeter_position


In [7]:
with open("/content/drive/MyDrive/foods.txt","rb") as file_handle:
    size = file_handle.seek(0,2) - 1       #Get the total size of the file in bytes
    more_data = True                     
    output = list()
    current_position = next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
        else:
            current_position,next_position = get_next_buffer_part(file_handle,current_position,0)
            output.append((current_position,next_position))
            current_position = current_position + next_position + 2

In [8]:
def get_dict_item(filename,start_index,delimeter_position,encoding='cp1252'):
    with open(filename,"rb") as file_handle:
        file_handle.seek(start_index)
        text = file_handle.read(delimeter_position).decode(encoding)
        elements = text.strip().split("\n")
        key_value_pairs = [(element.split(": ")[0], element.split(": ")[1])
                          if len(element.split(": ")) > 1
                          else ("unknown",element)
                          for element in elements]
        return dict(key_value_pairs)

In [9]:
reviews = bag.from_sequence(output).map(lambda x: get_dict_item("/content/drive/MyDrive/foods.txt",x[0],x[1]))

In [10]:
reviews.take(3)

({'product/productId': 'B001E4KFG0',
  'review/helpfulness': '1/1',
  'review/profileName': 'delmartian',
  'review/score': '5.0',
  'review/summary': 'Good Quality Dog Food',
  'review/text': 'I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.',
  'review/time': '1303862400',
  'review/userId': 'A3SGXH7AUHU8GW'},
 {'product/productId': 'B00813GRG4',
  'review/helpfulness': '0/0',
  'review/profileName': 'dll pa',
  'review/score': '1.0',
  'review/summary': 'Not as Advertised',
  'review/text': 'Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as "Jumbo".',
  'review/time': '1346976000',
  'review/userId': 'A1D87F6ZCVE5NK'},
 {'product/productId': 

In [11]:
def fetch_scores(element):
    numeric_score = float(element['review/score'])
    return numeric_score

In [12]:
review_scores = reviews.map(fetch_scores)

In [13]:
def tag_reviews(element):
    if float(element['review/score']) > 3:
        element['review/score'] = 'pos'
    else:
        element['review/score'] = 'neg'
    return element

In [14]:
reviews = reviews.map(tag_reviews)

In [15]:
!pip install contractions



#importing necessary library for text processing

In [16]:
import pandas as pd                                        #data processing
import numpy as np                                         #linear algebra
from nltk.tokenize import word_tokenize                    #for words tokenization(break the words)
import re                                                  #Regular expressions can contain both special and ordinary characters
import nltk
from contractions import contractions_dict                 # for removing contractions 
from nltk.corpus import stopwords
from spacy.lang.en.stop_words import STOP_WORDS            #provided stop_words by spacy
from itertools import filterfalse 
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer                    #for perfect stamming
from nltk.corpus import wordnet as wn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA

In [17]:
#tokenizing the txt
def text_tokenization(x):
    x['review/text'] = word_tokenize(x['review/text'])
    return x

In [18]:
tokenized_reviews = reviews.map(text_tokenization)

In [19]:
# converting all letter in lower case
def normalize_tokens(review):
    review['review/text'] =  [x.lower() for x in review['review/text']]
    return review

In [20]:
normalized_reviews = tokenized_reviews.map(normalize_tokens)

In [21]:
# for contraction word expansion
def contracted_word_expansion(token):
    if token in contractions_dict.keys():
        return contractions_dict[token]
    else:
        return token

In [22]:
# taking list of token ,doing lazy evaluation using map ,find expansion version of token and give expanded words
def contractions_expansion(review):
    review['review/text'] = list(map(contracted_word_expansion,review['review/text']))
    return review

In [23]:
contracted_reviews = normalized_reviews.map(contractions_expansion)

In [24]:
regex = r'^@[a-zA-z0-9]|^#[a-zA-Z0-9]|\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*|\W+|\d+|<("[^"]*"|\'[^\']*\'|[^\'">])*>|_+|[^\u0000-\u007f]+'

In [25]:
#check token that is waste word or not using regex
def waste_word_or_not(token):
    return re.search(regex,token)

In [26]:
#
def filter_waste_words(review):
    review['review/text'] = list(filterfalse(waste_word_or_not,review['review/text']))
    return review

In [27]:
filtered_reviews = contracted_reviews.map(filter_waste_words)

In [28]:
def split(review):
    review['review/text'] = list(map(lambda x: re.split(regex,x)[0],review['review/text']))
    return review

In [29]:
filtered_reviews = filtered_reviews.map(split)

In [30]:
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [31]:
#collection of stop_word available in english provided by nltk and spacy
en_stop_words = list(set(stopwords.words('english')).union(set(STOP_WORDS)))

In [32]:
#check is any token is stopword!! 
def is_stopword(token):
    return not(token in en_stop_words or re.search(r'\b\w\b|[^\u0000-\u007f]+|_+|\W+',token))

In [33]:
#removing stop_words
def stopwords_removal(review):
    review['review/text'] = list(filter(is_stopword,review['review/text']))
    return review

In [34]:
without_stopwords_reviews = filtered_reviews.map(stopwords_removal)

In [35]:
#pos_tags(part_of_speech_tags .i.e. verb,adjective,noun,adverb) 
def get_wnet_pos_tag(treebank_tag):
    wn.ensure_loaded()
    if treebank_tag[1].startswith('J'):
        return (treebank_tag[0],wn.ADJ)
    elif treebank_tag[1].startswith('V'):
        return (treebank_tag[0],wn.VERB)
    elif treebank_tag[1].startswith('N'):
        return (treebank_tag[0],wn.NOUN)
    elif treebank_tag[1].startswith('R'):
        return (treebank_tag[0],wn.ADV)
    else:
        return (treebank_tag[0],wn.NOUN)

In [36]:
def get_pos_tag(review):
    wn.ensure_loaded()
    review['review/text'] = list(map(get_wnet_pos_tag,pos_tag(review['review/text'])))
    return review

In [37]:
tagged_reviews = without_stopwords_reviews.map(get_pos_tag)

In [38]:
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

Lemmatizer Uses pos_tags to perform Lemmitization of words

In [39]:
lemmatizer = WordNetLemmatizer()  
wn.ensure_loaded()

In [40]:
def token_lemmatization(token_pos_tuple):
    wn.ensure_loaded()
    if token_pos_tuple == None:
        return ""
    else:
        return lemmatizer.lemmatize(word=token_pos_tuple[0],pos=token_pos_tuple[1])

In [41]:
def lemmatization(review):
    wn.ensure_loaded()
    if len(review['review/text']) > 0:
        review['review/text'] = list(map(token_lemmatization,review['review/text']))
    else:
        review['review/text'] = [""]
    return review

In [42]:
lemmatized_reviews = tagged_reviews.map(lemmatization)

In [43]:
def extract_tokens(review):
    return review['review/text']

In [44]:
extracted_tokens = lemmatized_reviews.map(extract_tokens)

In [45]:
unique_tokens = extracted_tokens.flatten().distinct()

In [46]:
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [47]:
nltk.download('omw')


[nltk_data] Downloading package omw to /root/nltk_data...
[nltk_data]   Package omw is already up-to-date!


True

In [48]:
#number of unique tokens
with ProgressBar():
    number_of_tokens = unique_tokens.count().compute()

[########################################] | 100% Completed | 48min 37.5s


In [49]:
number_of_tokens

90271

In [50]:
unique_tokens

dask.bag<distinct-aggregate, npartitions=1>

In [51]:
with ProgressBar():
    tokens_index = list(unique_tokens)

[########################################] | 100% Completed | 48min 35.5s


In [59]:
from collections import Counter
from collections import OrderedDict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import classification_report

In [60]:
def join_tokens(list_of_tokens):
    return " ".join(list_of_tokens)

In [65]:
def compute_tf(review):
    D = dict(Counter(review))
    non_included = set(tokens_index).difference(set(D.keys()))
    D_prime = dict(zip(non_included,list(np.zeros(len(non_included)))))
    D_prime.update(D)
    full_D = dict(OrderedDict(sorted(D_prime.items())))
    print(full_D)
    return np.array(full_D.values())

In [66]:
tf_vectors = extracted_tokens.map(compute_tf)

In [70]:
vectorizer = TfidfVectorizer(vocabulary=unique_tokens)
tf_idf_matrix = vectorizer.fit_transform(unique_tokens)


In [55]:
from dask import array as dask_array

In [56]:
tf_vector_data = tf_vectors.map(lambda x: dask_array.from_array(x).reshape(1,-1)).reduction(
        perpartition=stacker,aggregate=stacker)
tf_vector_data = tf_vector_data.compute()
progress(tf_vector_data)

NameError: ignored

In [71]:
pca = PCA(n_components=5000)


In [72]:
tf_idf_matrix_reduced = pca.fit_transform(tf_idf_matrix)


TypeError: ignored