Download the dataset fine foods

In [2]:

import urllib
print("- Downloading Fine Foods dataset... ", end='', flush=True)
url = "https://snap.stanford.edu/data/finefoods.txt.gz"
filename, headers = urllib.request.urlretrieve(url, '../data/raw/finefoods.txt.gz')
print("Done!", flush=True)

- Downloading Fine Foods dataset... Done!


In [10]:
import gzip

with open(filename, mode='rb') as foodscom, open('../data/raw/foods.txt','w',encoding='cp1252') as foods:
  foods.write(gzip.decompress(foodscom.read()).decode('cp1252'))

print("done!")

done!


In [11]:
# Listing 10.1
import dask.bag as bag
# import os
from dask.diagnostics import ProgressBar

# os.chdir('/Users/richard/Documents')
raw_data = bag.read_text('../data/raw/foods.txt')

def get_next_part(file, start_index, span_index=0, blocksize=1024):
    file.seek(start_index)
    buffer = file.read(blocksize + span_index).decode('cp1252')
    delimiter_position = buffer.find('\n\n')
    if delimiter_position == -1:
        return get_next_part(file, start_index, span_index + blocksize)
    else:
        file.seek(start_index)
        return start_index, delimiter_position
    
def get_item(filename, start_index, delimiter_position, encoding='cp1252'):
    with open(filename, 'rb') as file_handle:
        file_handle.seek(start_index)
        text = file_handle.read(delimiter_position).decode(encoding)
        elements = text.strip().split('\n')
        key_value_pairs = [(element.split(': ')[0], element.split(': ')[1]) 
                               if len(element.split(': ')) > 1 
                               else ('unknown', element) 
                               for element in elements]
        return dict(key_value_pairs)
    
with open('../data/raw/foods.txt', 'rb') as file_handle:
    size = file_handle.seek(0,2) - 1
    more_data = True
    output = []
    current_position = next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
        else:
            current_position, next_position = get_next_part(file_handle, current_position, 0)
            output.append((current_position, next_position))
            current_position = current_position + next_position + 2
            
reviews = bag.from_sequence(output).map(lambda x: get_item('../data/raw/foods.txt', x[0], x[1]))

def tag_positive_negative_by_score(element):
    if float(element['review/score']) > 3:
        element['review/sentiment'] = 'positive'
    else:
        element['review/sentiment'] = 'negative'
    return element

tagged_reviews = reviews.map(tag_positive_negative_by_score)

In [2]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/serendipita/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

tagged_reviews

In [3]:
# Listing 10.2
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from functools import partial

tokenizer = RegexpTokenizer(r'\w+')

def extract_reviews(element):
    element['review/tokens'] = element['review/text'].lower()
    return element

def tokenize_reviews(element):
    element['review/tokens'] = tokenizer.tokenize(element['review/tokens'])
    return element

def filter_stopword(word, stopwords):
    return word not in stopwords

def filter_stopwords(element, stopwords):
    element['review/tokens'] = list(filter(partial(filter_stopword, stopwords=stopwords), element['review/tokens']))
    return element

stopword_set = set(stopwords.words('english'))
more_stopwords = {'br', 'amazon', 'com', 'http', 'www', 'href', 'gp'}
all_stopwords = stopword_set.union(more_stopwords)

review_extracted_text = tagged_reviews.map(extract_reviews)
review_tokens = review_extracted_text.map(tokenize_reviews)
review_text_clean = review_tokens.map(partial(filter_stopwords, stopwords=all_stopwords))

In [4]:
# Listing 10.3
def extract_tokens(element):
    return element['review/tokens']

extracted_tokens = review_text_clean.map(extract_tokens)
unique_tokens = extracted_tokens.flatten().distinct()

with ProgressBar():
    number_of_tokens = unique_tokens.count().compute()
number_of_tokens

[########################################] | 100% Completed | 24.4s


114290

In [5]:
# Listing 10.4
def count(accumulator, element):
    return accumulator + 1

def combine(total_1, total_2):
    return total_1 + total_2

with ProgressBar():
    token_counts = extracted_tokens.flatten().foldby(lambda x: x, count, 0, combine, 0).compute()
    
top_tokens = sorted(token_counts, key=lambda x: x[1], reverse=True)
top_100_tokens = list(map(lambda x: x[0], top_tokens[:100]))

[########################################] | 100% Completed | 33.7s


In [6]:
# Listing 10.5
import numpy as np
def vectorize_tokens(element):
    vectorized_tokens = np.where(np.isin(top_100_tokens, element['review/tokens']), 1, 0)
    element['review/token_vector'] = vectorized_tokens
    return element

def prep_model_data(element):
    return {'target': 1 if element['review/sentiment'] == 'positive' else 0,
            'features': element['review/token_vector']}

model_data = review_text_clean.map(vectorize_tokens).map(prep_model_data)

model_data.take(5)

({'target': 1,
  'features': array([1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0])},
 {'target': 0,
  'features': array([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])},
 {'target': 1,
  'features': array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0

In [7]:
# Listing 10.6
from dask import array as dask_array
def stacker(partition):
    return dask_array.concatenate([element for element in partition])

with ProgressBar():
    feature_arrays = model_data.pluck('features').map(lambda x: dask_array.from_array(x, 1000).reshape(1,-1)).reduction(perpartition=stacker, aggregate=stacker)
    feature_array = feature_arrays.compute()
feature_array

[########################################] | 100% Completed | 14min 13.1s


Unnamed: 0,Array,Chunk
Bytes,433.70 MiB,800 B
Shape,"(568454, 100)","(1, 100)"
Count,2979350 Tasks,568454 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 433.70 MiB 800 B Shape (568454, 100) (1, 100) Count 2979350 Tasks 568454 Chunks Type int64 numpy.ndarray",100  568454,

Unnamed: 0,Array,Chunk
Bytes,433.70 MiB,800 B
Shape,"(568454, 100)","(1, 100)"
Count,2979350 Tasks,568454 Chunks
Type,int64,numpy.ndarray


In [8]:
# Listing 10.7
with ProgressBar():
    feature_array.rechunk(5000).to_zarr('sentiment_feature_array.zarr')
    feature_array = dask_array.from_zarr('sentiment_feature_array.zarr')
    
with ProgressBar():
    target_arrays = model_data.pluck('target').map(lambda x: dask_array.from_array(x, 1000).reshape(-1,1)).reduction(perpartition=stacker, aggregate=stacker)
    target_arrays.compute().rechunk(5000).to_zarr('sentiment_target_array.zarr')
    target_array = dask_array.from_zarr('sentiment_target_array.zarr')

[########################################] | 100% Completed |  5min 19.5s
[########################################] | 100% Completed |  3min 45.0s
[########################################] | 100% Completed |  3min  0.9s


In [11]:
# test at the moment added recently
import dask.bag as bag
from dask import array as dask_array
from dask.diagnostics import ProgressBar
with ProgressBar():
    feature_array = dask_array.from_zarr('sentiment_feature_array.zarr')
    target_array = dask_array.from_zarr('sentiment_target_array.zarr')

### week 06 part 5

In [12]:
# Listing 10.8
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split

X = feature_array
y = target_array.flatten()

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

lr = LogisticRegression()

with ProgressBar():
    lr.fit(X_train, y_train)

[########################################] | 100% Completed |  0.8s
[##                                      ] | 7% Completed |  0.2s

  return np.exp(A)


[########################################] | 100% Completed |  7.0s
[########################################] | 100% Completed |  7.0s
[########################################] | 100% Completed |  6.5s
[########################################] | 100% Completed |  6.0s
[########################################] | 100% Completed |  6.8s
[########################################] | 100% Completed |  6.3s
[########################################] | 100% Completed |  6.9s
[########################################] | 100% Completed |  7.1s
[########################################] | 100% Completed |  6.9s
[########################################] | 100% Completed |  7.0s
[########################################] | 100% Completed |  6.9s
[########################################] | 100% Completed |  6.7s
[########################################] | 100% Completed |  6.6s
[########################################] | 100% Completed |  6.5s
[########################################] | 100

### Section 10.2.1

In [13]:
# Listing 10.9
lr.score(X_test, y_test).compute()

0.7968370685712275

### week06 part 4

In [14]:
# Listing 10.10
from sklearn.naive_bayes import BernoulliNB
from dask_ml.wrappers import Incremental

nb = BernoulliNB()

parallel_nb = Incremental(nb)

with ProgressBar():
    parallel_nb.fit(X_train, y_train, classes=[0,1])

[########################################] | 100% Completed |  2.9s


In [15]:
# Listing 10.11
parallel_nb.score(X_test, y_test)

0.7898356964430215

### week06 10.2.3

In [16]:
# Listing 10.12
from dask_ml.model_selection import GridSearchCV

parameters = {'penalty': ['l1', 'l2'], 'C': [0.5, 1, 2]}

lr = LogisticRegression()
tuned_lr = GridSearchCV(lr, parameters)

with ProgressBar():
    tuned_lr.fit(X_train, y_train)   

[########################################] | 100% Completed | 31min 46.7s
[########################################] | 100% Completed |  8min 24.5s


In [17]:
# Listing 10.13
import pandas as pd
pd.DataFrame(tuned_lr.cv_results_)

Unnamed: 0,params,mean_fit_time,std_fit_time,mean_score_time,std_score_time,split0_test_score,split1_test_score,split2_test_score,mean_test_score,std_test_score,rank_test_score,param_C,param_penalty
0,"{'C': 0.5, 'penalty': 'l1'}",1894.814204,4.2373,2.762652,1.793734,0.79066,0.79409,0.796506,0.793752,0.002399,1,0.5,l1
1,"{'C': 0.5, 'penalty': 'l2'}",377.200217,6.974454,6.257253,0.446388,0.790836,0.793821,0.796477,0.793711,0.002304,2,0.5,l2
2,"{'C': 1, 'penalty': 'l1'}",1834.167502,50.909252,3.119642,1.678937,0.790736,0.793668,0.796483,0.793629,0.002346,6,1.0,l1
3,"{'C': 1, 'penalty': 'l2'}",233.634461,0.233799,5.780558,0.168163,0.790836,0.793821,0.796477,0.793711,0.002304,2,1.0,l2
4,"{'C': 2, 'penalty': 'l1'}",1076.348245,40.163175,0.972806,0.106549,0.790707,0.793727,0.796512,0.793649,0.002371,5,2.0,l1
5,"{'C': 2, 'penalty': 'l2'}",184.970602,2.386207,3.145777,1.626576,0.790836,0.793821,0.796477,0.793711,0.002304,2,2.0,l2


### week 06

In [18]:
# Listing 14
import dill
with open('naive_bayes_model.pkl', 'wb') as file:
    dill.dump(parallel_nb, file)

In [22]:
import numpy as np

In [23]:
# Listing 15
with open('naive_bayes_model.pkl', 'rb') as file:
    nb = dill.load(file)
nb.predict(np.random.randint(0,2,(100,100)))

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])