In [1]:
import pandas as pd
import dask.dataframe as dd
import ast
from nltk.tokenize import word_tokenize
from contractions import contractions_dict  # your custom contraction dictionary
import nltk
nltk.download('punkt')

# ========================
# 📂 Load CSV (adjust path

df = dd.read_csv(
    "all-the-news-2-1-cleaned.csv",
    usecols=['article'],  # or any subset you actually need
    dtype={'article': 'object'},
    on_bad_lines='skip',
    encoding='utf-8'
)

# ========================
# 🔤 Normalize text
# ========================
def normalize_text(x):
    if pd.isna(x) or x == '':
        return ''
    return x.lower()

normalized_reviews = df['article'].map_partitions(
    lambda col: col.map(normalize_text),
    meta=pd.Series(dtype=object)
)

# ========================
# 🧠 Tokenize safely
# ========================
def text_tokenization(x):
    try:
        if pd.isna(x) or x.strip() == '':
            return []
        return word_tokenize(x)
    except Exception:
        return []

tokenized = normalized_reviews.map_partitions(
    lambda col: col.map(text_tokenization),
    meta=pd.Series(dtype=object)
)

# ========================
# 🛠️ Fix stringified lists (if needed)
# ========================
def safe_eval(x):
    try:
        return ast.literal_eval(x) if isinstance(x, str) else x
    except Exception:
        return []

tokenized = tokenized.map_partitions(
    lambda col: col.map(safe_eval),
    meta=pd.Series(dtype=object)
)

# ========================
# 🔧 Expand contractions
# ========================
def expand_token(token):
    return contractions_dict.get(token, token)

def expand_contractions(tokens):
    if isinstance(tokens, list):
        expanded = []
        for token in tokens:
            expanded.extend(expand_token(token).split())
        return expanded
    return []

contracted_reviews = tokenized.map_partitions(
    lambda col: col.map(expand_contractions),
    meta=pd.Series(dtype=object)
)

# ========================
# ✅ Inspect sample
# ========================
sample = contracted_reviews.head(3)
for i, row in enumerate(sample):
    print(f"\nRow {i}:\n{row}\nType: {type(row)}")


[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Lenovo\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!



Row 0:
Type: <class 'list'>

Row 1:
['davos', ',', 'switzerland', '(', 'reuters', ')', '-', 'u.s.', 'president', 'donald', 'trump', 'denied', 'a', 'report', 'on', 'friday', 'that', 'he', 'had', 'ordered', 'special', 'counsel', 'robert', 'mueller', 'fired', 'last', 'june', ',', 'calling', 'it', '“', 'fake', 'news', '”', '.', 'the', 'new', 'york', 'times', 'reported', 'on', 'thursday', 'that', 'trump', 'backed', 'down', 'from', 'his', 'order', 'after', 'the', 'white', 'house', 'counsel', 'threatened', 'to', 'resign', 'rather', 'than', 'follow', 'his', 'directive', ',', 'citing', 'four', 'people', 'told', 'of', 'the', 'matter', '.', '“', 'fake', 'news', ',', 'folks', ',', 'fake', 'news', ',', '”', 'trump', 'told', 'reporters', 'in', 'davos', ',', 'when', 'asked', 'about', 'the', 'report', '.', 'reporting', 'by', 'steve', 'holland']
Type: <class 'list'>

Row 2:
['paris', '(', 'reuters', ')', '-', 'former', 'french', 'president', 'nicolas', 'sarkozy', 'published', 'a', 'new', 'memoir', 'on

In [2]:
regex = r'^@[a-zA-z0-9]|^#[a-zA-Z0-9]|\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*|\W+|\d+|<("[^"]*"|\'[^\']*\'|[^\'">])*>|_+|[^\u0000-\u007f]+'

In [3]:
import re
from nltk.corpus import stopwords
from spacy.lang.en.stop_words import STOP_WORDS
from itertools import filterfalse
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet as wn
from sklearn.feature_extraction.text import TfidfVectorizer

In [4]:
def waste_word_or_not(token):
    return bool(re.search(regex, token))

# Apply filter to a tokenized list
def filter_waste_words(tokens):
    if isinstance(tokens, list):
        return list(filterfalse(waste_word_or_not, tokens))
    return []

# Apply over contracted tokenized reviews
filtered_reviews = contracted_reviews.map_partitions(
    lambda col: col.map(filter_waste_words),
    meta=pd.Series(dtype=object)
)

# Preview
filtered_reviews.head(2)


0    [this, post, is, part, of, polyarchy, an, inde...
1    [davos, switzerland, reuters, president, donal...
dtype: object

In [5]:
def split(tokens):
    if isinstance(tokens, list):
        return [re.split(regex, x)[0] for x in tokens]
    return []
# 1. Split on regex
filtered_reviews = filtered_reviews.map_partitions(
    lambda col: col.map(split),
    meta=pd.Series(dtype=object)
)
en_stop_words = set(stopwords.words('english')).union(STOP_WORDS)
def is_stopword(token):
    return not (
        token in en_stop_words or 
        re.search(r'\b\w\b|[^\u0000-\u007f]+|_+|\W+', token)
    )

def stopwords_removal(tokens):
    if isinstance(tokens, list):
        return list(filter(is_stopword, tokens))
    return []

# 2. Remove stopwords and noise
without_stopwords_reviews = filtered_reviews.map_partitions(
    lambda col: col.map(stopwords_removal),
    meta=pd.Series(dtype=object)
)
without_stopwords_reviews.head(2)


0    [post, polyarchy, independent, blog, produced,...
1    [davos, switzerland, reuters, president, donal...
dtype: object

In [6]:
without_stopwords_reviews.shape

(<dask_expr.expr.Scalar: expr=MapPartitions(lambda).size(), dtype=int32>,)

In [7]:
from pos_helpers import process_partition
tagged_reviews = without_stopwords_reviews.map_partitions(process_partition, meta=('x', 'object'))
tagged_reviews.head(2)

0    [(post, n), (polyarchy, n), (independent, a), ...
1    [(davos, n), (switzerland, n), (reuters, n), (...
Name: x, dtype: object

In [8]:
# Instantiate lemmatizer at top-level (important for Dask)
lemmatizer = WordNetLemmatizer()

def token_lemmatization(token_pos_tuple):
    if token_pos_tuple is None or len(token_pos_tuple) < 2:
        return ""
    return lemmatizer.lemmatize(word=token_pos_tuple[0], pos=token_pos_tuple[1])

def lemmatization(review):
    if isinstance(review, list) and len(review) > 0:
        return list(map(token_lemmatization, review))
    return [""]

def process_lemmatization_partition(partition_series):
    return partition_series.map(lemmatization)

# Apply in Dask using named function (not lambda)
lemmatized_reviews = tagged_reviews.map_partitions(
    process_lemmatization_partition,
    meta=('x', 'object')
)

# View result
print(lemmatized_reviews.head(2))

0    [post, polyarchy, independent, blog, produce, ...
1    [davos, switzerland, reuters, president, donal...
Name: x, dtype: object


In [9]:
extracted_tokens = lemmatized_reviews  # This is a dask Series
import dask.bag as db

# Convert series to bag
token_bag = lemmatized_reviews.to_bag()

In [10]:
def count(accumulator,element):
    return accumulator + 1
def combine(total_1,total_2):
    return total_1 + total_2
from dask.distributed import Client
client = Client(processes=None)
token_counts = token_bag.flatten().foldby(
    key=lambda x: x,      # Group by token itself
    binop=count,          # Count each occurrence
    initial=0,            # Start from 0
    combine=combine,      # Combine results from partitions
    combine_initial=0     # Start from 0 when combining
).compute()

top_tokens = sorted(token_counts,key=lambda x:x[1],reverse=True)
top_100_tokens = list(map(lambda x:x[0],top_tokens[:100]))
top_100_tokens[:2]

['say', 'like']

In [11]:
import numpy as np
def extract_bow_vector(review):
    one_hot_encoded_bow_vector = np.where(np.isin(top_100_tokens,review),1,0)
    review = one_hot_encoded_bow_vector
    return review
model_data = lemmatized_reviews.map_partitions(
    lambda col: col.map(extract_bow_vector),
    meta=pd.Series(dtype=object))
model_data.head(2)

0    [0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, ...
1    [0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, ...
dtype: object

In [12]:
import dask.array as da
model_bag = model_data.to_bag()
def to_dask_array(vec):
    return da.from_array(vec, chunks=500).reshape(1, -1)

def partition_elements_stacker(partition_content):
    partition_list = list(partition_content)
    return da.concatenate(partition_list, axis=0)

bow_matrix = model_bag.map(to_dask_array).reduction(
    perpartition=partition_elements_stacker,
    aggregate=partition_elements_stacker
).compute()

2025-08-09 01:18:17,196 - distributed.scheduler - ERROR - Task partition_elements_stacker-aggregate-finalize-hlgfinalizecompute-22ff46393343492ebf8fd4684d76bcb2 marked as failed because 4 workers died while trying to run it


KilledWorker: Attempted to run task 'partition_elements_stacker-aggregate-finalize-hlgfinalizecompute-22ff46393343492ebf8fd4684d76bcb2' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:49188. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

In [None]:
bow_matrix

In [None]:
!pip install zarr

bow_matrix.rechunk(500).to_zarr('bow_matrix.zarr')

In [None]:
import dask.dataframe as dd

# Convert to Dask DataFrame using top_100_tokens as column names
bow_ddf = dd.from_dask_array(bow_matrix, columns=top_100_tokens)

# Preview the first few rows
bow_ddf.head(5)
