In [1]:
import pandas as pd
import dask.bag as bag
import dask.dataframe as dd
import ast
from nltk.tokenize import word_tokenize
from contractions import contractions_dict  # your custom contraction dictionary
import nltk
nltk.download('punkt')




# ========================
# 📂 Load CSV (adjust path

raw_text = bag.read_text("foods.txt",encoding='cp1252')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Lenovo\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


## DATA PREPERATION ##

In [2]:
raw_text.take(20)

('product/productId: B001E4KFG0\n',
 'review/userId: A3SGXH7AUHU8GW\n',
 'review/profileName: delmartian\n',
 'review/helpfulness: 1/1\n',
 'review/score: 5.0\n',
 'review/time: 1303862400\n',
 'review/summary: Good Quality Dog Food\n',
 'review/text: I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.\n',
 '\n',
 'product/productId: B00813GRG4\n',
 'review/userId: A1D87F6ZCVE5NK\n',
 'review/profileName: dll pa\n',
 'review/helpfulness: 0/0\n',
 'review/score: 1.0\n',
 'review/time: 1346976000\n',
 'review/summary: Not as Advertised\n',
 'review/text: Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as "Jumbo".\n',
 '\n',
 'product/productId: B000LQ

In [3]:
from dask.delayed import delayed

In [4]:
def get_next_buffer_part(file,start_index,span_index=0,blocksize=1000):
    file.seek(start_index)
    buffer = file.read(blocksize + span_index).decode('cp1252')
    delimeter_position = buffer.find('\n\n')
    if delimeter_position == -1:
        return get_next_buffer_part(file,start_index,span_index+blocksize)
    else:
        file.seek(start_index)
        return start_index,delimeter_position

In [5]:
with open("foods.txt","rb") as file_handle:
    size = file_handle.seek(0,2) - 1       #Get the total size of the file in bytes
    more_data = True                     
    output = list()
    current_position = next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
        else:
            current_position,next_position = get_next_buffer_part(file_handle,current_position,0)
            output.append((current_position,next_position))
            current_position = current_position + next_position + 2

In [6]:
def get_dict_item(filename,start_index,delimeter_position,encoding='cp1252'):
    with open(filename,"rb") as file_handle:
        file_handle.seek(start_index)
        text = file_handle.read(delimeter_position).decode(encoding)
        elements = text.strip().split("\n")
        key_value_pairs = [(element.split(": ")[0], element.split(": ")[1])
                          if len(element.split(": ")) > 1
                          else ("unknown",element)
                          for element in elements]
        return dict(key_value_pairs)

In [7]:
reviews = bag.from_sequence(output).map(lambda x: get_dict_item("foods.txt",x[0],x[1]))
print(reviews.take(5))

({'product/productId': 'B001E4KFG0', 'review/userId': 'A3SGXH7AUHU8GW', 'review/profileName': 'delmartian', 'review/helpfulness': '1/1', 'review/score': '5.0', 'review/time': '1303862400', 'review/summary': 'Good Quality Dog Food', 'review/text': 'I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.'}, {'product/productId': 'B00813GRG4', 'review/userId': 'A1D87F6ZCVE5NK', 'review/profileName': 'dll pa', 'review/helpfulness': '0/0', 'review/score': '1.0', 'review/time': '1346976000', 'review/summary': 'Not as Advertised', 'review/text': 'Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as "Jumbo".'}, {'product/productId': 'B000LQOCH0', 'review/userId':

In [8]:
def fetch_scores(element):
    numeric_score = float(element['review/score'])
    return numeric_score
review_scores = reviews.map(fetch_scores)
print(review_scores.take(5))

(5.0, 1.0, 4.0, 2.0, 5.0)


In [9]:
def tag_reviews(element):
    if float(element['review/score']) > 3:
        element['review/score'] = 'pos'
    else:
        element['review/score'] = 'neg'
    return element
reviews = reviews.map(tag_reviews)

In [10]:
# Extract unique helpfulness strings
unique_help_str = reviews.pluck('review/helpfulness').distinct()

# Compute all unique values as a Python list
all_unique_help = unique_help_str.compute()

print(all_unique_help)  # prints all unique values
print(f"Total unique values: {len(all_unique_help)}")

['1/1', '0/0', '3/3', '4/4', '2/2', '4/5', '0/1', '19/19', '13/13', '9/9', '1/2', '0/7', '2/4', '0/2', '15/15', '5/5', '2/3', '2/8', '17/19', '0/4', '5/6', '8/8', '6/6', '3/4', '1/3', '0/3', '3/10', '1/4', '1/9', '3/7', '6/8', '7/8', '6/7', '1/5', '5/19', '7/7', '3/5', '5/7', '7/13', '0/6', '4/7', '43/47', '13/15', '14/17', '11/14', '20/27', '3/6', '4/8', '2/5', '2/6', '3/8', '2/7', '0/5', '1/7', '7/9', '27/27', '12/12', '1/8', '6/10', '5/10', '10/12', '9/10', '16/17', '12/13', '11/11', '7/10', '4/6', '5/11', '8/12', '8/9', '21/21', '26/31', '19/21', '13/14', '10/11', '15/18', '10/10', '11/12', '14/14', '11/13', '2/19', '16/18', '23/23', '0/18', '165/168', '45/46', '30/30', '100/133', '5/16', '1/10', '5/27', '6/9', '32/36', '24/27', '22/25', '18/26', '5/8', '6/11', '5/12', '11/24', '4/12', '2/9', '14/21', '17/17', '18/22', '13/16', '9/11', '9/12', '6/14', '1/6', '27/29', '15/17', '4/17', '7/11', '5/15', '20/20', '20/22', '0/8', '22/27', '22/23', '8/10', '6/12', '9/14', '7/12', '3/9', '

In [11]:
def score_to_percentage(element):
    xy = element.get('review/helpfulness', '0/1')  # default to 0/1
    try:
        numerator, denominator = xy.split('/')
        numerator = float(numerator)
        denominator = float(denominator)
        percentage = (numerator / denominator) * 100 if denominator != 0 else 0.0
        percentage = round(percentage)  # round to nearest integer
    except Exception:
        percentage = 0

    # ✅ Update only that field
    element['review/helpfulness'] = int(percentage)
    return element

reviews = reviews.map(score_to_percentage)
def fetch_help(element):
    numeric_score = element['review/helpfulness']
    return numeric_score
review_help = reviews.map(fetch_help)
print(review_help.take(5))

(100, 0, 100, 100, 0)


In [12]:
def result(element):
    score = float(element['review/helpfulness'])

    if score < 50:
        element['review/helpfulness'] = 'Fail'
    elif 50 <= score < 75:
        element['review/helpfulness'] = 'Average'
    else:
        element['review/helpfulness'] = 'Great'

    return element

# apply on Dask Bag
reviews = reviews.map(result)

In [13]:
print(reviews.take(5))

({'product/productId': 'B001E4KFG0', 'review/userId': 'A3SGXH7AUHU8GW', 'review/profileName': 'delmartian', 'review/helpfulness': 'Great', 'review/score': 'pos', 'review/time': '1303862400', 'review/summary': 'Good Quality Dog Food', 'review/text': 'I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.'}, {'product/productId': 'B00813GRG4', 'review/userId': 'A1D87F6ZCVE5NK', 'review/profileName': 'dll pa', 'review/helpfulness': 'Fail', 'review/score': 'neg', 'review/time': '1346976000', 'review/summary': 'Not as Advertised', 'review/text': 'Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as "Jumbo".'}, {'product/productId': 'B000LQOCH0', 'review/userI

In [14]:
type(reviews)

dask.bag.core.Bag

In [15]:
import pandas as pd
import dask.dataframe as dd

# 1️⃣ Suppose your data is a list of dicts (reviews)
# reviews = [...]  # your large list of dictionaries
first_100_items = reviews.take(100)
# 2️⃣ Convert to pandas DataFrame
pdf = pd.DataFrame(first_100_items)
# 3️⃣ Convert pandas DataFrame → Dask DataFrame
df = dd.from_pandas(pdf, npartitions=5)  # adjust partitions as needed

In [16]:
print(df.head())

  product/productId   review/userId               review/profileName  \
0        B001E4KFG0  A3SGXH7AUHU8GW                       delmartian   
1        B00813GRG4  A1D87F6ZCVE5NK                           dll pa   
2        B000LQOCH0   ABXLMWJIXXAIN  Natalia Corres "Natalia Corres"   
3        B000UA0QIQ  A395BORC6FGVXV                             Karl   
4        B006K2ZZ7K  A1UQRSCLF8GW1T    Michael D. Bigham "M. Wassir"   

  review/helpfulness review/score review/time         review/summary  \
0              Great          pos  1303862400  Good Quality Dog Food   
1               Fail          neg  1346976000      Not as Advertised   
2              Great          pos  1219017600  "Delight" says it all   
3              Great          neg  1307923200         Cough Medicine   
4               Fail          pos  1350777600            Great taffy   

                                         review/text  
0  I have bought several of the Vitality canned d...  
1  Product arrived label

In [17]:
df = df.rename(columns={"review/text": "article"})

In [18]:

# ========================
# 🔤 Normalize text
# ========================
def normalize_text(x):
    if pd.isna(x) or x == '':
        return ''
    return x.lower()

normalized_reviews = df['article'].map_partitions(
    lambda col: col.map(normalize_text),
    meta=pd.Series(dtype=object)
)

# ========================
# 🧠 Tokenize safely
# ========================
def text_tokenization(x):
    try:
        if pd.isna(x) or x.strip() == '':
            return []
        return word_tokenize(x)
    except Exception:
        return []

tokenized = normalized_reviews.map_partitions(
    lambda col: col.map(text_tokenization),
    meta=pd.Series(dtype=object)
)

# ========================
# 🛠️ Fix stringified lists (if needed)
# ========================
def safe_eval(x):
    try:
        return ast.literal_eval(x) if isinstance(x, str) else x
    except Exception:
        return []

tokenized = tokenized.map_partitions(
    lambda col: col.map(safe_eval),
    meta=pd.Series(dtype=object)
)

# ========================
# 🔧 Expand contractions
# ========================
def expand_token(token):
    return contractions_dict.get(token, token)

def expand_contractions(tokens):
    if isinstance(tokens, list):
        expanded = []
        for token in tokens:
            expanded.extend(expand_token(token).split())
        return expanded
    return []

contracted_reviews = tokenized.map_partitions(
    lambda col: col.map(expand_contractions),
    meta=pd.Series(dtype=object)
)

# ========================
# ✅ Inspect sample
# ========================
sample = contracted_reviews.head(3)
for i, row in enumerate(sample):
    print(f"\nRow {i}:\n{row}\nType: {type(row)}")



Row 0:
['i', 'have', 'bought', 'several', 'of', 'the', 'vitality', 'canned', 'dog', 'food', 'products', 'and', 'have', 'found', 'them', 'all', 'to', 'be', 'of', 'good', 'quality', '.', 'the', 'product', 'looks', 'more', 'like', 'a', 'stew', 'than', 'a', 'processed', 'meat', 'and', 'it', 'smells', 'better', '.', 'my', 'labrador', 'is', 'finicky', 'and', 'she', 'appreciates', 'this', 'product', 'better', 'than', 'most', '.']
Type: <class 'list'>

Row 1:
['product', 'arrived', 'labeled', 'as', 'jumbo', 'salted', 'peanuts', '...', 'the', 'peanuts', 'were', 'actually', 'small', 'sized', 'unsalted', '.', 'not', 'sure', 'if', 'this', 'was', 'an', 'error', 'or', 'if', 'the', 'vendor', 'intended', 'to', 'represent', 'the', 'product', 'as', '``', 'jumbo', "''", '.']
Type: <class 'list'>

Row 2:
['this', 'is', 'a', 'confection', 'that', 'has', 'been', 'around', 'a', 'few', 'centuries', '.', 'it', 'is', 'a', 'light', ',', 'pillowy', 'citrus', 'gelatin', 'with', 'nuts', '-', 'in', 'this', 'case', 

In [19]:
regex = r'^@[a-zA-z0-9]|^#[a-zA-Z0-9]|\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*|\W+|\d+|<("[^"]*"|\'[^\']*\'|[^\'">])*>|_+|[^\u0000-\u007f]+'

In [20]:
import re
from nltk.corpus import stopwords
from spacy.lang.en.stop_words import STOP_WORDS
from itertools import filterfalse
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet as wn
from sklearn.feature_extraction.text import TfidfVectorizer

In [21]:
def waste_word_or_not(token):
    return bool(re.search(regex, token))

# Apply filter to a tokenized list
def filter_waste_words(tokens):
    if isinstance(tokens, list):
        return list(filterfalse(waste_word_or_not, tokens))
    return []

# Apply over contracted tokenized reviews
filtered_reviews = contracted_reviews.map_partitions(
    lambda col: col.map(filter_waste_words),
    meta=pd.Series(dtype=object)
)

# Preview
filtered_reviews.head(2)


0    [i, have, bought, several, of, the, vitality, ...
1    [product, arrived, labeled, as, jumbo, salted,...
dtype: object

In [22]:
def split(tokens):
    if isinstance(tokens, list):
        return [re.split(regex, x)[0] for x in tokens]
    return []
# 1. Split on regex
filtered_reviews = filtered_reviews.map_partitions(
    lambda col: col.map(split),
    meta=pd.Series(dtype=object)
)
en_stop_words = set(stopwords.words('english')).union(STOP_WORDS)
def is_stopword(token):
    return not (
        token in en_stop_words or 
        re.search(r'\b\w\b|[^\u0000-\u007f]+|_+|\W+', token)
    )

def stopwords_removal(tokens):
    if isinstance(tokens, list):
        return list(filter(is_stopword, tokens))
    return []

# 2. Remove stopwords and noise
without_stopwords_reviews = filtered_reviews.map_partitions(
    lambda col: col.map(stopwords_removal),
    meta=pd.Series(dtype=object)
)
without_stopwords_reviews.head(2)


0    [bought, vitality, canned, dog, food, products...
1    [product, arrived, labeled, jumbo, salted, pea...
dtype: object

In [23]:
from pos_helpers import process_partition
tagged_reviews = without_stopwords_reviews.map_partitions(process_partition, meta=('x', 'object'))
tagged_reviews.head(2)

0    [(bought, v), (vitality, n), (canned, v), (dog...
1    [(product, n), (arrived, v), (labeled, a), (ju...
Name: x, dtype: object

In [24]:
# Instantiate lemmatizer at top-level (important for Dask)
lemmatizer = WordNetLemmatizer()

def token_lemmatization(token_pos_tuple):
    if token_pos_tuple is None or len(token_pos_tuple) < 2:
        return ""
    return lemmatizer.lemmatize(word=token_pos_tuple[0], pos=token_pos_tuple[1])

def lemmatization(review):
    if isinstance(review, list) and len(review) > 0:
        return list(map(token_lemmatization, review))
    return [""]

def process_lemmatization_partition(partition_series):
    return partition_series.map(lemmatization)

# Apply in Dask using named function (not lambda)
lemmatized_reviews = tagged_reviews.map_partitions(
    process_lemmatization_partition,
    meta=('x', 'object')
)

# View result
print(lemmatized_reviews.head(2))

0    [buy, vitality, can, dog, food, product, find,...
1    [product, arrive, labeled, jumbo, salt, peanut...
Name: x, dtype: object


In [25]:
extracted_tokens = lemmatized_reviews  # This is a dask Series
import dask.bag as db

# Convert series to bag
token_bag = lemmatized_reviews.to_bag()

In [26]:
def count(accumulator,element):
    return accumulator + 1
def combine(total_1,total_2):
    return total_1 + total_2
from dask.distributed import Client
client = Client(processes=None)
token_counts = token_bag.flatten().foldby(
    key=lambda x: x,      # Group by token itself
    binop=count,          # Count each occurrence
    initial=0,            # Start from 0
    combine=combine,      # Combine results from partitions
    combine_initial=0     # Start from 0 when combining
).compute()

top_tokens = sorted(token_counts,key=lambda x:x[1],reverse=True)
top_100_tokens = list(map(lambda x:x[0],top_tokens[:100]))
top_100_tokens[:2]

['br', 'good']

In [27]:
import numpy as np
def extract_bow_vector(review):
    one_hot_encoded_bow_vector = np.where(np.isin(top_100_tokens,review),1,0)
    review = one_hot_encoded_bow_vector
    return review
model_data = lemmatized_reviews.map_partitions(
    lambda col: col.map(extract_bow_vector),
    meta=pd.Series(dtype=object))
model_data.head(2)

0    [0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
1    [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
dtype: object

In [28]:
import dask.array as da
model_bag = model_data.to_bag()

## MULTIPLE SVM ##

In [29]:
# Add the BOW vector to the DataFrame
df['bow_vector'] = model_data

# Define a function using the row
def prep_model_data_row(row):
    return {
        'label': 1 if row['review/score'] == 'pos' else 0,
        'bow_vector': row['bow_vector']
    }

# Apply row-wise
model_data_dicts = df.map_partitions(
    lambda partition: partition.apply(prep_model_data_row, axis=1),
    meta=pd.Series(dtype=object)
)
model_data_dicts

Dask Series Structure:
npartitions=5
0     object
16       ...
       ...  
61       ...
75       ...
Dask Name: lambda, 15 expressions
Expr=MapPartitions(lambda)

In [30]:
model_data_dicts.head(5)

0    {'label': 1, 'bow_vector': [0, 1, 0, 1, 1, 0, ...
1    {'label': 0, 'bow_vector': [0, 0, 0, 0, 1, 0, ...
2    {'label': 1, 'bow_vector': [0, 0, 0, 0, 0, 0, ...
3    {'label': 0, 'bow_vector': [0, 1, 0, 0, 0, 0, ...
4    {'label': 1, 'bow_vector': [0, 0, 0, 0, 0, 0, ...
dtype: object

In [31]:
type(model_data_dicts)

dask.dataframe.dask_expr._collection.Series

In [32]:
def extract_label_bow(partition):
    # partition is a pandas Series
    def process(x):
        # Convert each element in bow_vector to int
        bow = [int(i) for i in x['bow_vector']]
        return pd.Series({'label': int(x['label']), 'bow_vector': bow})
    return partition.apply(process)

svm_data = model_data_dicts.map_partitions(extract_label_bow, meta={'label': int, 'bow_vector': object})



In [33]:
svm_data.head

<bound method FrameBase.head of Dask DataFrame Structure:
               label bow_vector
npartitions=5                  
0              int32     object
16               ...        ...
...              ...        ...
61               ...        ...
75               ...        ...
Dask Name: extract_label_bow, 16 expressions
Expr=MapPartitions(extract_label_bow)>

In [34]:
print(svm_data.columns)

Index(['label', 'bow_vector'], dtype='object')


In [35]:
type(svm_data)

dask.dataframe.dask_expr._collection.DataFrame

In [36]:
import dask.array as da
import numpy as np
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Convert bow_vector column (lists) to NumPy array
X = np.array(svm_data['bow_vector'].compute().tolist())  # compute() brings it to memory
y = svm_data['label'].compute()


In [37]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

kernels = ['linear', 'poly', 'rbf', 'sigmoid']
for kernel in kernels:
    clf = SVC(kernel=kernel)
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    print(f"SVM with {kernel} kernel accuracy: {accuracy_score(y_test, y_pred):.4f}")


SVM with linear kernel accuracy: 0.6250
SVM with poly kernel accuracy: 0.6250
SVM with rbf kernel accuracy: 0.6250
SVM with sigmoid kernel accuracy: 0.6250


## MULTICLASS SVM ##

In [42]:
# Assume df already has 'bow_vector' and 'review/helpfulness' processed
# Make sure 'review/helpfulness' is numeric or categorical
# df['review/helpfulness'] = df['review/helpfulness'].map(score_to_percentage)

# Create a new DataFrame with only the columns we need
svm_df = df[['review/helpfulness', 'bow_vector']].rename(
    columns={'review/helpfulness': 'label'}
)

In [45]:
# Convert bow_vector column (lists) to NumPy array
X = np.array(svm_df['bow_vector'].compute().tolist())  # compute() brings it to memory
y = svm_df['label'].compute()

In [46]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

kernels = ['linear', 'poly', 'rbf', 'sigmoid']

for kernel in kernels:
    clf = SVC(kernel=kernel, decision_function_shape='ovr')  # ✅ OvR handles multiclass
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    print(f"SVM with {kernel} kernel accuracy: {accuracy_score(y_test, y_pred):.4f}")


SVM with linear kernel accuracy: 0.6875
SVM with poly kernel accuracy: 0.5625
SVM with rbf kernel accuracy: 0.5625
SVM with sigmoid kernel accuracy: 0.5625


In [48]:
print(X)

[[0 1 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 1 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]


In [49]:
print(y)

0     Great
1      Fail
2     Great
3     Great
4      Fail
      ...  
71    Great
72     Fail
73     Fail
74     Fail
75     Fail
Name: label, Length: 76, dtype: string
