In [1]:
import time
# import tqdm
from tqdm.notebook import tqdm  # Import the notebook version of tqdm

from datasets import load_dataset
import pandas as pd
import numpy as np
import huggingface_hub
from huggingface_hub import HfFileSystem
hffs = HfFileSystem()
from concurrent.futures import ThreadPoolExecutor, as_completed

import transformers
transformers.logging.set_verbosity_error()
from transformers import AutoTokenizer


None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


In [2]:
dataset = load_dataset("HuggingFaceFW/fineweb-edu", data_files="sample/10BT/*.parquet", streaming=True, split="train")

In [3]:
files = hffs.ls("datasets/HuggingFaceFW/fineweb-edu/sample/10BT", detail=False)

In [5]:
files

['datasets/HuggingFaceFW/fineweb-edu/sample/10BT/000_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/001_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/002_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/003_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/004_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/005_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/006_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/007_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/008_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/009_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/010_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/011_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/012_00000.parquet',
 'datasets/HuggingFaceFW/fineweb-edu/sample/10BT/013_00000.parquet']

In [4]:
file = files[0]

In [None]:
# df = pd.read_parquet("hf://" + files[0])
df = pd.read_parquet(file.split("/")[-1])

In [None]:
df.head()

In [None]:
# df.to_parquet(files[0].split("/")[-1])

In [None]:
MAX_TOKENS = 512

In [None]:
# keep_keys = ["id", "url", "score", "dump"]

In [None]:
# tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", model_max_length=MAX_TOKENS)

In [None]:
# def chunk(rows):
#     texts = rows["text"]
#     chunks_index = []
#     chunks_text = []
#     chunks_tokens = []
#     updated_token_counts = []

#     # Assuming you have other properties in the rows that you want to retain
#     keep = {key: [] for key in keep_keys}

#     for index, text in enumerate(texts):
#         tokens = tokenizer.encode(text)
#         token_count = len(tokens)

#         if token_count > MAX_TOKENS:
#             overlap = int(MAX_TOKENS * 0.1)
#             start_index = 0
#             ci = 0
#             while start_index < len(tokens):
#                 end_index = min(start_index + MAX_TOKENS, len(tokens))
#                 chunk = tokens[start_index:end_index]
#                 chunks_index.append(ci)
#                 chunks_tokens.append(chunk)
#                 updated_token_counts.append(len(chunk))
#                 chunks_text.append(tokenizer.decode(chunk))
#                 # Copy other properties for each chunk
#                 for key in keep:
#                     keep[key].append(rows[key][index])
#                 start_index += MAX_TOKENS - overlap
#                 ci += 1
#         else:
#             chunks_index.append(0)
#             chunks_text.append(text)
#             chunks_tokens.append(tokens)
#             updated_token_counts.append(token_count)
#             # Copy other properties for non-chunked texts
#             for key in keep:
#                 keep[key].append(rows[key][index])

#     keep["chunk_index"] = chunks_index
#     keep["chunk_text"] = chunks_text
#     keep["chunk_tokens"] = chunks_tokens
#     keep["chunk_token_count"] = updated_token_counts
#     return keep


In [None]:
def chunk_row(row, tokenizer):
    # print("ROW", row)
    MAX_TOKENS = 512
    keep_keys = ["id", "url", "score", "dump"]
    text = row["text"]
    chunks = []

    tokens = tokenizer.encode(text)
    token_count = len(tokens)
    if token_count > MAX_TOKENS:
        overlap = int(MAX_TOKENS * 0.1)
        start_index = 0
        ci = 0
        while start_index < len(tokens):
            end_index = min(start_index + MAX_TOKENS, len(tokens))
            chunk = tokens[start_index:end_index]
            chunks.append({
                "chunk_index": ci,
                "chunk_text": tokenizer.decode(chunk),
                "chunk_tokens": chunk,
                "chunk_token_count": len(chunk),
                **{key: row[key] for key in keep_keys}
            })
            start_index += MAX_TOKENS - overlap
            ci += 1
    else:
        chunks.append({
            "chunk_index": 0,
            "chunk_text": text,
            "chunk_tokens": tokens,
            "chunk_token_count": token_count,
            **{key: row[key] for key in keep_keys}
        })

    return chunks


In [None]:
def process_dataframe(df):
    chunks_list = []
    with ThreadPoolExecutor(max_workers=16) as executor:
        # Submit all rows to the executor
        pbar = tqdm(total=len(df), desc="Processing Rows")
        
        def process_batch(batch):
            
            tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", model_max_length=MAX_TOKENS)
            batch_chunks = []
            for row in batch:
                row_chunks = chunk_row(row, tokenizer)
                pbar.update(1)
                batch_chunks.extend(row_chunks)
            return batch_chunks


        print("making batches")
        batch_size = 200  # Adjust batch size based on your needs
        batches = [df.iloc[i:i + batch_size].to_dict(orient="records") for i in range(0, len(df), batch_size)]
        print("made batches")
        print("setting up futures")
        futures = [executor.submit(process_batch, batch) for batch in batches]
        # futures = [executor.submit(chunk_row, row) for index, row in df.iterrows()]
        # for future in tqdm(as_completed(futures), total=len(df), desc="Processing Rows"):
        #     chunks_list.extend(future.result())
        print("in the future")
        # pbar = tqdm(total=len(df)//batch_size, desc="Processing Rows")
        for future in as_completed(futures):
            chunks_list.extend(future.result())
            # print(len(chunks_list))
            # pbar.update(1)  # Manually update the progress bar
        pbar.close()
    return chunks_list

In [None]:
# Process the DataFrame and create a new DataFrame from the list of chunks
start = time.perf_counter()
print(f"Chunking text that is longer than {MAX_TOKENS} tokens")
chunked_data = process_dataframe(df)
print(f"Dataset chunked in {time.perf_counter() - start:.2f} seconds")
start = time.perf_counter()
chunked_df = pd.DataFrame(chunked_data)
print(f"Dataset converted to DataFrame in {time.perf_counter() - start:.2f} seconds")


In [None]:
# chunked_df.to_parquet("chunked-" + file.split("/")[-1])

In [None]:
len(chunked_df)

In [None]:
chunked_df.head()