<a href="https://colab.research.google.com/github/Walidsati/AAI614_Walid_sati/blob/main/Week%207/Notebook7.3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AAI614: Data Science & its Applications

*Notebook 7.3: Dask-ML*

<a href="https://colab.research.google.com/github/harmanani/AAI614/blob/main/Week%207/Notebook7.3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Source: Data Science with Python and Dask

In [3]:
import dask.bag as bag
import os
from dask.diagnostics import ProgressBar

# Get the foods dataset from https://snap.stanford.edu/data/web-FineFoods.html
raw_data = bag.read_text('foods.txt')



def get_next_part(file, start_index, span_index=0, blocksize=1024):
    file.seek(start_index)
    # Increase the initial blocksize to potentially capture the delimiter
    buffer = file.read(blocksize * 4 + span_index).decode('cp1252', errors='ignore')
    delimiter_position = buffer.find('\n\n')
    if delimiter_position == -1 and start_index + blocksize * 4 + span_index < file.seek(0, 2):
        # Only recurse if the end of file has not been reached
        return get_next_part(file, start_index, span_index + blocksize * 4)
    else:
        file.seek(start_index)
        return start_index, delimiter_position if delimiter_position != -1 else len(buffer)

# ... (rest of the code remains the same)

def get_item(filename, start_index, delimiter_position, encoding='cp1252'):
    with open(filename, 'rb') as file_handle:
        file_handle.seek(start_index)
        text = file_handle.read(delimiter_position).decode(encoding)
        elements = text.strip().split('\n')
        key_value_pairs = [(element.split(': ')[0], element.split(': ')[1])
                               if len(element.split(': ')) > 1
                               else ('unknown', element)
                               for element in elements]
        return dict(key_value_pairs)

with open('foods.txt', 'rb') as file_handle:
    size = file_handle.seek(0,2) - 1
    more_data = True
    output = []
    current_position = next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
        else:
            current_position, next_position = get_next_part(file_handle, current_position, 0)
            output.append((current_position, next_position))
            current_position = current_position + next_position + 2

reviews = bag.from_sequence(output).map(lambda x: get_item('foods.txt', x[0], x[1]))

def tag_positive_negative_by_score(element):
    if float(element['review/score']) > 3:
        element['review/sentiment'] = 'positive'
    else:
        element['review/sentiment'] = 'negative'
    return element

tagged_reviews = reviews.map(tag_positive_negative_by_score)

In [4]:
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from functools import partial

tokenizer = RegexpTokenizer(r'\w+')

def extract_reviews(element):
    element['review/tokens'] = element['review/text'].lower()
    return element

def tokenize_reviews(element):
    element['review/tokens'] = tokenizer.tokenize(element['review/tokens'])
    return element

def filter_stopword(word, stopwords):
    return word not in stopwords

def filter_stopwords(element, stopwords):
    element['review/tokens'] = list(filter(partial(filter_stopword, stopwords=stopwords), element['review/tokens']))
    return element

stopword_set = set(stopwords.words('english'))
more_stopwords = {'br', 'amazon', 'com', 'http', 'www', 'href', 'gp'}
all_stopwords = stopword_set.union(more_stopwords)

review_extracted_text = tagged_reviews.map(extract_reviews)
review_tokens = review_extracted_text.map(tokenize_reviews)
review_text_clean = review_tokens.map(partial(filter_stopwords, stopwords=all_stopwords))

LookupError: 
**********************************************************************
  Resource [93mstopwords[0m not found.
  Please use the NLTK Downloader to obtain the resource:

  [31m>>> import nltk
  >>> nltk.download('stopwords')
  [0m
  For more information see: https://www.nltk.org/data.html

  Attempted to load [93mcorpora/stopwords[0m

  Searched in:
    - '/root/nltk_data'
    - '/usr/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/lib/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/local/share/nltk_data'
    - '/usr/lib/nltk_data'
    - '/usr/local/lib/nltk_data'
**********************************************************************


In [None]:
def extract_tokens(element):
    return element['review/tokens']

extracted_tokens = review_text_clean.map(extract_tokens)
unique_tokens = extracted_tokens.flatten().distinct()

with ProgressBar():
    number_of_tokens = unique_tokens.count().compute()
number_of_tokens

[########################################] | 100% Completed | 34.8s


114290

In [None]:
# Listing 10.4
def count(accumulator, element):
    return accumulator + 1

def combine(total_1, total_2):
    return total_1 + total_2

with ProgressBar():
    token_counts = extracted_tokens.flatten().foldby(lambda x: x, count, 0, combine, 0).compute()

top_tokens = sorted(token_counts, key=lambda x: x[1], reverse=True)
top_100_tokens = list(map(lambda x: x[0], top_tokens[:100]))

[########################################] | 100% Completed | 49.4s


In [None]:
import numpy as np
def vectorize_tokens(element):
    vectorized_tokens = np.where(np.isin(top_100_tokens, element['review/tokens']), 1, 0)
    element['review/token_vector'] = vectorized_tokens
    return element

def prep_model_data(element):
    return {'target': 1 if element['review/sentiment'] == 'positive' else 0,
            'features': element['review/token_vector']}

model_data = review_text_clean.map(vectorize_tokens).map(prep_model_data)

model_data.take(5)

({'target': 1,
  'features': array([1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,
         0, 0, 0, 0, 0, 0, 0, 0])},
 {'target': 0,
  'features': array([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0])},
 {'target': 1,
  'features': array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0

In [None]:
from dask import array as dask_array
def stacker(partition):
    return dask_array.concatenate([element for element in partition])

with ProgressBar():
    feature_arrays = model_data.pluck('features').map(lambda x: dask_array.from_array(x, 1000).reshape(1,-1)).reduction(perpartition=stacker, aggregate=stacker)
    feature_array = feature_arrays.compute()
feature_array

In [None]:
with ProgressBar():
    feature_array.rechunk(5000).to_zarr('sentiment_feature_array.zarr')
    feature_array = dask_array.from_zarr('sentiment_feature_array.zarr')

with ProgressBar():
    target_arrays = model_data.pluck('target').map(lambda x: dask_array.from_array(x, 1000).reshape(-1,1)).reduction(perpartition=stacker, aggregate=stacker)
    target_arrays.compute().rechunk(5000).to_zarr('sentiment_target_array.zarr')
    target_array = dask_array.from_zarr('sentiment_target_array.zarr')

[########################################] | 100% Completed |  5min 32.8s


In [None]:
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split

X = feature_array
y = target_array.flatten()

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

lr = LogisticRegression()

with ProgressBar():
    lr.fit(X_train, y_train)

[########################################] | 100% Completed |  0.9s
[####################                    ] | 50% Completed |  1.6s

  return np.exp(A)


[########################################] | 100% Completed |  3.6s
[########################################] | 100% Completed |  3.8s
[########################################] | 100% Completed |  3.9s
[########################################] | 100% Completed |  3.8s
[########################################] | 100% Completed |  3.6s
[########################################] | 100% Completed |  3.9s
[########################################] | 100% Completed |  3.7s
[########################################] | 100% Completed |  3.5s
[########################################] | 100% Completed |  3.7s
[########################################] | 100% Completed |  4.0s
[########################################] | 100% Completed |  4.0s
[########################################] | 100% Completed |  4.0s
[########################################] | 100% Completed |  3.8s
[########################################] | 100% Completed |  3.9s
[########################################] | 100

In [None]:
lr.score(X_test, y_test).compute()

0.79629173556626676

In [None]:
from sklearn.naive_bayes import BernoulliNB
from dask_ml.wrappers import Incremental

nb = BernoulliNB()

parallel_nb = Incremental(nb)

with ProgressBar():
    parallel_nb.fit(X_train, y_train, classes=[0,1])

[########################################] | 100% Completed |  2.1s


In [None]:
parallel_nb.score(X_test, y_test)

0.78886817014389754

In [None]:
from dask_ml.model_selection import GridSearchCV

parameters = {'penalty': ['l1', 'l2'], 'C': [0.5, 1, 2]}

lr = LogisticRegression()
tuned_lr = GridSearchCV(lr, parameters)

with ProgressBar():
    tuned_lr.fit(X_train, y_train)

[########################################] | 100% Completed | 23min 24.1s


In [None]:
import pandas as pd
pd.DataFrame(tuned_lr.cv_results_)



Unnamed: 0,params,mean_fit_time,std_fit_time,mean_score_time,std_score_time,split0_test_score,split1_test_score,split2_test_score,mean_test_score,std_test_score,rank_test_score,split0_train_score,split1_train_score,split2_train_score,mean_train_score,std_train_score,param_C,param_penalty
0,"{'C': 0.5, 'penalty': 'l1'}",1308.978919,11.548624,0.347088,0.04454,0.790291,0.793938,0.797087,0.793772,0.002777,4,0.795671,0.794152,0.792604,0.794142,0.001252,0.5,l1
1,"{'C': 0.5, 'penalty': 'l2'}",143.865403,2.276777,0.626723,0.145728,0.790801,0.793715,0.796987,0.793834,0.002527,1,0.796081,0.794008,0.792264,0.794118,0.00156,0.5,l2
2,"{'C': 1, 'penalty': 'l1'}",1211.649146,72.024862,0.639021,0.275957,0.790689,0.793551,0.796559,0.7936,0.002397,6,0.796014,0.793724,0.792182,0.793973,0.001574,1.0,l1
3,"{'C': 1, 'penalty': 'l2'}",74.962411,1.968621,0.55358,0.068979,0.790801,0.793715,0.796987,0.793834,0.002527,1,0.796081,0.794008,0.792267,0.794119,0.001559,1.0,l2
4,"{'C': 2, 'penalty': 'l1'}",608.802576,58.226398,0.31594,0.122815,0.790701,0.793592,0.796835,0.793709,0.002505,5,0.79602,0.793829,0.792255,0.794035,0.001544,2.0,l1
5,"{'C': 2, 'penalty': 'l2'}",101.755454,7.513333,0.553664,0.067346,0.790801,0.793715,0.796987,0.793834,0.002527,1,0.796081,0.794008,0.792267,0.794119,0.001559,2.0,l2


In [None]:
import dill
with open('naive_bayes_model.pkl', 'wb') as file:
    dill.dump(parallel_nb, file)

In [None]:
with open('naive_bayes_model.pkl', 'rb') as file:
    nb = dill.load(file)
nb.predict(np.random.randint(0,2,(100,100)))

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1])