# Text Vectorization Pipeline

This example illustrates how Dask-ML can be used to classify large textual datasets in parallel.
It is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).

The primary differences are that

* We fit the entire model, including text vectorization, as a pipeline.
* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)
  rather than generators to work with larger than memory datasets.

In [6]:
%pip install -U dask[complete]

Collecting dask[complete]
  Downloading dask-2024.5.1-py3-none-any.whl.metadata (3.8 kB)
Collecting dask-expr<1.2,>=1.1 (from dask[complete])
  Downloading dask_expr-1.1.1-py3-none-any.whl.metadata (2.4 kB)
Collecting distributed==2024.5.1 (from dask[complete])
  Downloading distributed-2024.5.1-py3-none-any.whl.metadata (3.4 kB)
Downloading dask-2024.5.1-py3-none-any.whl (1.2 MB)
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   -- ------------------------------------- 0.1/1.2 MB 1.3 MB/s eta 0:00:01
   -------- ------------------------------- 0.3/1.2 MB 2.8 MB/s eta 0:00:01
   -------------------- ------------------- 0.6/1.2 MB 4.5 MB/s eta 0:00:01
   ---------------------------------------- 1.2/1.2 MB 6.5 MB/s eta 0:00:00
Downloading distributed-2024.5.1-py3-none-any.whl (1.0 MB)
   ---------------------------------------- 0.0/1.0 MB ? eta -:--:--
   ---------------------------------------- 1.0/1.0 MB 66.6 MB/s eta 0:00:00
Downloading dask_expr-1.1.1-py3-none-a



In [1]:
from dask.distributed import Client, progress

client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 4,Total memory: 3.73 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:60144,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 3.73 GiB

0,1
Comm: tcp://127.0.0.1:60156,Total threads: 2
Dashboard: http://127.0.0.1:60157/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:60147,
Local directory: C:\Users\copev\AppData\Local\Temp\dask-scratch-space\worker-cramb_do,Local directory: C:\Users\copev\AppData\Local\Temp\dask-scratch-space\worker-cramb_do

0,1
Comm: tcp://127.0.0.1:60159,Total threads: 2
Dashboard: http://127.0.0.1:60160/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:60149,
Local directory: C:\Users\copev\AppData\Local\Temp\dask-scratch-space\worker-zw5496_r,Local directory: C:\Users\copev\AppData\Local\Temp\dask-scratch-space\worker-zw5496_r


## Fetch the data

Scikit-Learn provides a utility to fetch the newsgroups dataset.

In [2]:
import sklearn.datasets

bunch = sklearn.datasets.fetch_20newsgroups()

The data from scikit-learn isn't *too* large, so the data is just
returned in memory. Each document is a string. The target we're predicting
is an integer, which codes the topic of the post.

We'll load the documents and targets directly into a dask DataFrame.
In practice, on a larger than memory dataset, you would likely load the
documents from disk or cloud storage using `dask.bag` or `dask.delayed`.

In [3]:
import dask.dataframe as dd
import pandas as pd

df = dd.from_pandas(
  pd.DataFrame({"text": bunch.data, "target": bunch.target}),
  npartitions=25,
)

df

Unnamed: 0_level_0,text,target
npartitions=25,Unnamed: 1_level_1,Unnamed: 2_level_1
0,string,int32
453,...,...
...,...,...
10862,...,...
11313,...,...


Each row in the `text` column has a bit of metadata and the full text of a post.

In [4]:
print(df.head().loc[0, 'text'][:500])

From: lerxst@wam.umd.edu (where's my thing)
Subject: WHAT car is this!?
Nntp-Posting-Host: rac3.wam.umd.edu
Organization: University of Maryland, College Park
Lines: 15

 I was wondering if anyone out there could enlighten me on this car I saw
the other day. It was a 2-door sports car, looked to be from the late 60s/
early 70s. It was called a Bricklin. The doors were really small. In addition,
the front bumper was separate from the rest of the body. This is 
all I know. If anyone can tellme a m


## Feature Hashing

Dask's [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) provides a similar API to [scikit-learn's implementation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html). In fact, Dask-ML's implementation uses scikit-learn's, applying it to each partition of the input `dask.dataframe.Series` or `dask.bag.Bag`.

Transformation, once we actually compute the result, happens in parallel and returns a dask Array.

In [5]:
import dask_ml.feature_extraction.text

vect = dask_ml.feature_extraction.text.HashingVectorizer()
X = vect.fit_transform(df['text'])
X

ValueError: Metadata inference failed in `_transformer`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
AttributeError("'NAType' object has no attribute 'lower'")

Traceback:
---------
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\dask\dataframe\utils.py", line 195, in raise_on_meta_error
    yield
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\dask_expr\_expr.py", line 3987, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\dask_ml\feature_extraction\text.py", line 36, in _transformer
    return self._hasher(**self.get_params()).transform(part)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\sklearn\feature_extraction\text.py", line 882, in transform
    X = self._get_hasher().transform(analyzer(doc) for doc in X)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\sklearn\utils\_set_output.py", line 273, in wrapped
    data_to_wrap = f(self, X, *args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\sklearn\feature_extraction\_hash.py", line 179, in transform
    indices, indptr, values = _hashing_transform(
                              ^^^^^^^^^^^^^^^^^^^
  File "sklearn\\feature_extraction\\_hashing_fast.pyx", line 40, in sklearn.feature_extraction._hashing_fast.transform
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\sklearn\feature_extraction\_hash.py", line 177, in <genexpr>
    raw_X = (((f, 1) for f in x) for x in raw_X_)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\sklearn\feature_extraction\text.py", line 882, in <genexpr>
    X = self._get_hasher().transform(analyzer(doc) for doc in X)
                                     ^^^^^^^^^^^^^
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\sklearn\feature_extraction\text.py", line 110, in _analyze
    doc = preprocessor(doc)
          ^^^^^^^^^^^^^^^^^
  File "c:\Users\copev\.pyenv\pyenv-win\versions\3.11.4\Lib\site-packages\sklearn\feature_extraction\text.py", line 68, in _preprocess
    doc = doc.lower()
          ^^^^^^^^^


The output array `X` has unknown chunk sizes becase the input dask Series or Bags don't know their own length.

Each block in `X` is a `scipy.sparse` matrix.

In [6]:
X.blocks[0].compute()

NameError: name 'X' is not defined

This is a document-term matrix. Each row is the hashed representation of the original post.

## Classification Pipeline

We can combine the [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) with [Incremental](https://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) and a classifier like scikit-learn's `SGDClassifier` to
create a classification pipeline.

We'll predict whether the topic was in the `comp` category.

In [None]:
bunch.target_names

In [None]:
import numpy as np

positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]
y = df['target'].isin(positive).astype(int)
y

In [None]:
import numpy as np
import sklearn.linear_model
import sklearn.pipeline

import dask_ml.wrappers

Because the input comes from a dask Series, with unknown chunk sizes, we need to specify `assume_equal_chunks=True`. This tells Dask-ML that we know that each partition in `X`
matches a partition in `y`.

In [None]:
sgd = sklearn.linear_model.SGDClassifier(
    tol=1e-3
)
clf = dask_ml.wrappers.Incremental(
    sgd, scoring='accuracy', assume_equal_chunks=True
)
pipe = sklearn.pipeline.make_pipeline(vect, clf)

`SGDClassifier.partial_fit` needs to know the full set of classes up front.
Because our `sgd` is wrapped inside an `Incremental`, we need to pass it through
as the `incremental__classes` keyword argument in `fit`.

In [None]:
pipe.fit(df['text'], y,
         incremental__classes=[0, 1]);

As usual, `Incremental.predict` lazily returns the predictions as a dask Array.

In [None]:
predictions = pipe.predict(df['text'])
predictions

We can compute the predictions and score in parallel with `dask_ml.metrics.accuracy_score`.

In [None]:
dask_ml.metrics.accuracy_score(y, predictions)

This simple combination of a HashingVectorizer and SGDClassifier is
pretty effective at this prediction task.