In [1]:
import dask.bag as bag
import os
from dask import array as dask_array
from dask.diagnostics import ProgressBar

In [None]:
raw_text = bag.read_text("foods.txt",encoding='cp1252')

In [None]:
from dask.delayed import delayed

In [None]:
def get_next_buffer_part(file,start_index,span_index=0,blocksize=1000):
    file.seek(start_index)
    buffer = file.read(blocksize + span_index).decode('cp1252')
    delimeter_position = buffer.find('\n\n')
    if delimeter_position == -1:
        return get_next_buffer_part(file,start_index,span_index+blocksize)
    else:
        file.seek(start_index)
        return start_index,delimeter_position

In [None]:
with open("foods.txt","rb") as file_handle:
    size = file_handle.seek(0,2) - 1       #Get the total size of the file in bytes
    more_data = True                     
    output = list()
    current_position = next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
        else:
            current_position,next_position = get_next_buffer_part(file_handle,current_position,0)
            output.append((current_position,next_position))
            current_position = current_position + next_position + 2

In [None]:
def get_dict_item(filename,start_index,delimeter_position,encoding='cp1252'):
    with open(filename,"rb") as file_handle:
        file_handle.seek(start_index)
        text = file_handle.read(delimeter_position).decode(encoding)
        elements = text.strip().split("\n")
        key_value_pairs = [(element.split(": ")[0], element.split(": ")[1])
                          if len(element.split(": ")) > 1
                          else ("unknown",element)
                          for element in elements]
        return dict(key_value_pairs)

In [None]:
reviews = bag.from_sequence(output).map(lambda x: get_dict_item("foods.txt",x[0],x[1]))

In [None]:
def fetch_scores(element):
    numeric_score = float(element['review/score'])
    return numeric_score

In [None]:
review_scores = reviews.map(fetch_scores)

In [None]:
def tag_reviews(element):
    if float(element['review/score']) > 3:
        element['review/score'] = 'pos'
    else:
        element['review/score'] = 'neg'
    return element

In [None]:
reviews = reviews.map(tag_reviews)

In [None]:
import pandas as pd
import numpy as np
from nltk.tokenize import word_tokenize
import re
import nltk
from contractions import contractions_dict
from nltk.corpus import stopwords
from spacy.lang.en.stop_words import STOP_WORDS
from itertools import filterfalse
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet as wn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA

In [None]:
def text_tokenization(x):
    x['review/text'] = word_tokenize(x['review/text'])
    return x

In [None]:
tokenized_reviews = reviews.map(text_tokenization)

In [None]:
def normalize_tokens(review):
    review['review/text'] =  [x.lower() for x in review['review/text']]
    return review

In [None]:
normalized_reviews = tokenized_reviews.map(normalize_tokens)

In [None]:
def contracted_word_expansion(token):
    if token in contractions_dict.keys():
        return contractions_dict[token]
    else:
        return token

In [None]:
def contractions_expansion(review):
    review['review/text'] = list(map(contracted_word_expansion,review['review/text']))
    return review

In [None]:
contracted_reviews = normalized_reviews.map(contractions_expansion)

In [None]:
regex = r'^@[a-zA-z0-9]|^#[a-zA-Z0-9]|\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*|\W+|\d+|<("[^"]*"|\'[^\']*\'|[^\'">])*>|_+|[^\u0000-\u007f]+'

In [None]:
def waste_word_or_not(token):
    return re.search(regex,token)

In [None]:
def filter_waste_words(review):
    review['review/text'] = list(filterfalse(waste_word_or_not,review['review/text']))
    return review

In [None]:
filtered_reviews = contracted_reviews.map(filter_waste_words)

In [None]:
def split(review):
    review['review/text'] = list(map(lambda x: re.split(regex,x)[0],review['review/text']))
    return review

In [None]:
filtered_reviews = filtered_reviews.map(split)

In [None]:
en_stop_words = list(set(stopwords.words('english')).union(set(STOP_WORDS)))

In [None]:
def is_stopword(token):
    return not(token in en_stop_words or re.search(r'\b\w\b|[^\u0000-\u007f]+|_+|\W+',token))

In [None]:
def stopwords_removal(review):
    review['review/text'] = list(filter(is_stopword,review['review/text']))
    return review

In [None]:
without_stopwords_reviews = filtered_reviews.map(stopwords_removal)

In [None]:
def get_wnet_pos_tag(treebank_tag):
    if treebank_tag[1].startswith('J'):
        return (treebank_tag[0],wn.ADJ)
    elif treebank_tag[1].startswith('V'):
        return (treebank_tag[0],wn.VERB)
    elif treebank_tag[1].startswith('N'):
        return (treebank_tag[0],wn.NOUN)
    elif treebank_tag[1].startswith('R'):
        return (treebank_tag[0],wn.ADV)
    else:
        return (treebank_tag[0],wn.NOUN)

In [None]:
def get_pos_tag(review):
    review['review/text'] = list(map(get_wnet_pos_tag,pos_tag(review['review/text'])))
    return review

In [None]:
tagged_reviews = without_stopwords_reviews.map(get_pos_tag)

In [None]:
lemmatizer = WordNetLemmatizer()

In [None]:
def token_lemmatization(token_pos_tuple):
    if token_pos_tuple == None:
        return ""
    else:
        return lemmatizer.lemmatize(word=token_pos_tuple[0],pos=token_pos_tuple[1])

In [None]:
def lemmatization(review):
    if len(review['review/text']) > 0:
        review['review/text'] = list(map(token_lemmatization,review['review/text']))
    else:
        review['review/text'] = [""]
    return review

In [None]:
lemmatized_reviews = tagged_reviews.map(lemmatization)

In [None]:
def extract_tokens(review):
    return review['review/text']

In [None]:
extracted_tokens = lemmatized_reviews.map(extract_tokens)

In [None]:
def count(accumulator,element):
    return accumulator + 1

In [None]:
def combine(total_1,total_2):
    return total_1 + total_2

In [None]:
from dask.distributed import Client

In [None]:
client = Client(processes=None)

In [None]:
from dask.diagnostics import ProgressBar

In [None]:
with ProgressBar():
    token_counts_per_document = extracted_tokens.flatten().foldby(lambda x: x,count,0).compute()

In [None]:
with ProgressBar():
    token_counts = extracted_tokens.flatten().foldby(lambda x: x,count,0,combine,0).compute()

In [None]:
top_tokens = sorted(token_counts,key=lambda x:x[1],reverse=True)

In [None]:
top_100_tokens = list(map(lambda x:x[0],top_tokens[:100]))

In [None]:
def extract_bow_vector(review):
    one_hot_encoded_bow_vector = np.where(np.isin(top_100_tokens,review['review/text']),1,0)
    review['review/text'] = one_hot_encoded_bow_vector
    return review

In [None]:
def prep_model_data(review):
    return {'label' : 1 if review['review/score'] == 'pos' else 0, 'bow_vector':review['review/text']}

In [None]:
model_data = lemmatized_reviews.map(extract_bow_vector).map(prep_model_data)

In [None]:
model_data

In [None]:
def partition_elements_stacker(partition_content):
    partition_list = list()
    for single_element in partition_content:
        partition_list.append(single_element)
    return dask_array.concatenate(partition_list)

In [None]:
with ProgressBar():
    bow_matrix = model_data.pluck('bow_vector').map(lambda x: dask_array.from_array(x,chunks=500).reshape(1,-1)).reduction(
                perpartition=partition_elements_stacker,aggregate=partition_elements_stacker)
    bow_matrix = bow_matrix.compute()

In [None]:
bow_matrix

In [None]:
with ProgressBar():
    bow_matrix.rechunk(500).to_zarr('bow_matrix.zarr')

In [None]:
with ProgressBar():
    labels_array = model_data.pluck('label').map(lambda x: dask_array.from_array(x,chunks=500).reshape(-1,1)).reduction(
                perpartition=partition_elements_stacker,aggregate=partition_elements_stacker)
    labels_array = labels_array.compute()

In [None]:
with ProgressBar():
    labels_array.rechunk(500).to_zarr('labels_array.zarr')

In [2]:
from sklearn.svm import SVC

In [3]:
with ProgressBar():
    features = dask_array.from_zarr('bow_matrix.zarr')

In [4]:
features

Unnamed: 0,Array,Chunk
Bytes,216.85 MiB,195.31 kiB
Shape,"(568454, 100)","(500, 100)"
Count,1138 Tasks,1137 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 216.85 MiB 195.31 kiB Shape (568454, 100) (500, 100) Count 1138 Tasks 1137 Chunks Type int32 numpy.ndarray",100  568454,

Unnamed: 0,Array,Chunk
Bytes,216.85 MiB,195.31 kiB
Shape,"(568454, 100)","(500, 100)"
Count,1138 Tasks,1137 Chunks
Type,int32,numpy.ndarray


In [5]:
with ProgressBar():
    labels = dask_array.from_zarr('labels_array.zarr')

In [6]:
labels

Unnamed: 0,Array,Chunk
Bytes,2.17 MiB,1.95 kiB
Shape,"(568454, 1)","(500, 1)"
Count,1138 Tasks,1137 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 2.17 MiB 1.95 kiB Shape (568454, 1) (500, 1) Count 1138 Tasks 1137 Chunks Type int32 numpy.ndarray",1  568454,

Unnamed: 0,Array,Chunk
Bytes,2.17 MiB,1.95 kiB
Shape,"(568454, 1)","(500, 1)"
Count,1138 Tasks,1137 Chunks
Type,int32,numpy.ndarray


In [7]:
X = features
y = labels.flatten()

In [8]:
from dask_ml.model_selection import train_test_split

In [9]:
X_train,X_text,y_train,y_test = train_test_split(X,y,random_state=42)

In [None]:
from dask_ml.linear_model import LogisticRegression

In [None]:
logistic_reg_obj = LogisticRegression()

In [None]:
logistic_reg_obj.fit(X_train,y_train)

In [10]:
svm_obj = SVC()

In [11]:
from dask.distributed import Client

In [12]:
client = Client(processes=None)

In [None]:
from sklearn.utils.

In [None]:
with sklearn.
    svm_obj.fit(X_train,y_train)