In [31]:
import pandas as pd
import ray
import re
import nltk
from nltk.corpus import stopwords
import numpy as np
from transformers import BertTokenizer

### Setup

In [32]:
ray.data.DatasetContext.get_current().execution_options.preserve_order = True  # deterministic

### Ingesting

In [33]:
ds = ray.data.read_csv('dataset.csv')
ds = ds.random_shuffle(seed=42)
ds.take(1)

2023-11-08 10:56:05,195	INFO read_api.py:406 -- To satisfy the requested parallelism of 8, each read task output is split into 8 smaller blocks.
2023-11-08 10:56:05,255	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(8)] -> AllToAllOperator[RandomShuffle] -> LimitOperator[limit=1]
2023-11-08 10:56:05,255	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-11-08 10:56:05,255	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/64 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[{'id': 1968,
  'created_on': datetime.datetime(2020, 7, 26, 4, 48, 40),
  'title': 'Sentiment Analysis With Transformers',
  'description': 'Sentiment analysis neural network trained by fine-tuning BERT, ALBERT, or DistilBERT on the Stanford Sentiment Treebank.',
  'tag': 'natural-language-processing'}]

### Splitting

In [34]:
import sys
sys.path.append("..")
from madewithml.data import stratify_split

TEST_SIZE = 0.2
train_ds, val_ds = stratify_split(ds, stratify='tag', test_size= TEST_SIZE)


2023-11-08 10:56:07,039	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(8)] -> AllToAllOperator[RandomShuffle] -> LimitOperator[limit=1]
2023-11-08 10:56:07,039	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-11-08 10:56:07,039	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/64 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

In [35]:
# Mapping

tags = train_ds.unique(column='tag')
class_to_idx = {tag:i for i, tag in enumerate(tags)}

2023-11-08 10:56:08,394	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(8)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> LimitOperator[limit=1]
2023-11-08 10:56:08,394	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-11-08 10:56:08,394	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/64 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/64 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/64 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/64 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/8 [00:00<?, ?it/s]

2023-11-08 10:56:10,504	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(8)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> AllToAllOperator[Aggregate] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-11-08 10:56:10,504	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-11-08 10:56:10,512	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/64 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/64 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/64 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/64 [00:00<?, ?it/s]

- Aggregate 11:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 12:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 13:   0%|          | 0/64 [00:00<?, ?it/s]

Running 0:   0%|          | 0/64 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/8 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/8 [00:00<?, ?it/s]

In [36]:
nltk.download("stopwords")
STOPWORDS = stopwords.words("english")

def clean_text(text, stopwords=STOPWORDS):
    """Clean raw text string."""
    # Lower
    text = text.lower()

    # Remove stopwords
    pattern = re.compile(r'\b(' + r"|".join(stopwords) + r")\b\s*")
    text = pattern.sub('', text)
 
    # Spacing and filters
    text = re.sub(r"([!\"'#$%&()*\+,-./:;<=>?@\\\[\]^_`{|}~])", r" \1 ", text)  # add spacing
    text = re.sub("[^A-Za-z0-9]+", " ", text)  # remove non alphanumeric chars
    text = re.sub(" +", " ", text)  # remove multiple spaces
    text = text.strip()  # strip white space at the ends
    text = re.sub(r"http\S+", "", text)  #  remove links

    return text

def tokenize(batch):
    tokenizer = BertTokenizer.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
    encoded_inputs = tokenizer(batch["text"].tolist(), return_tensors="np", padding="longest")
    return dict(ids=encoded_inputs["input_ids"], masks=encoded_inputs["attention_mask"], targets=np.array(batch["tag"]))

def preprocess(df, class_to_index):
    """Preprocess the data."""
    df["text"] = df.title + " " + df.description  # feature engineering
    df["text"] = df.text.apply(clean_text)  # clean text
    df = df.drop(columns=["id", "created_on", "title", "description"], errors="ignore")  # clean dataframe
    df = df.dropna(subset=['tag']) # drop na  from tags
    df = df[["text", "tag"]]  # rearrange columns
    df["tag"] = df["tag"].map(class_to_index)  # label encoding
    outputs = tokenize(df)
    return outputs



[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Saud\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [39]:
# Distributed preprocessing

sample_ds = train_ds.map_batches(preprocess, fn_kwargs={"class_to_index": class_to_idx}, batch_format="pandas")
sample_ds.show(1)

2023-11-08 10:58:03,674	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(8)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> TaskPoolMapOperator[MapBatches(preprocess)] -> LimitOperator[limit=1]
2023-11-08 10:58:03,682	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-11-08 10:58:03,682	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/64 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/64 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/64 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/64 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/64 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/8 [00:00<?, ?it/s]

{'ids': array([  102,  2886,  1965,  2617, 24423,  1547,  2886,  3997,  1904,
        2617, 24423, 25311,   103,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0]), 'masks': array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), 'targets': 2}
