In [2]:
from nemo_curator.modifiers.pii_modifier import PiiModifier
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules.modify import Modify
import pandas as pd
import dask
import dask.dataframe as dd

In [3]:
def check_partition_size(dataset):
    partitions = dataset.df.to_delayed()

    # Oblicz rozmiar każdej partycji
    partition_sizes = [part.memory_usage(deep=True).sum() for part in partitions]

    # Wywołaj compute, aby uzyskać rzeczywiste wartości rozmiarów
    partition_sizes = dask.compute(*partition_sizes)

    # Wyświetl rozmiary partycji
    for i, size in enumerate(partition_sizes):
        print(f"Rozmiar partycji {i}: {size / (1024 ** 2):.2f} MB")
        
def divide_partition_manual(dataset):
    large_partition_idx = [i for i, size in enumerate(partition_sizes) if size > 500 * 1024 * 1024]  # np. > 500 MB

    for idx in large_partition_idx:
        large_part = dataset.df.get_partition(idx)
        # Rozbijamy dużą partycję na mniejsze fragmenty i łączymy je z pozostałymi
        split_df = dd.from_pandas(large_part.compute(), npartitions=5)
        dataset.df = dataset.df.drop_partitions(idx).concat([dataset.df, split_df])
    return dataset

In [4]:
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=2, memory_limit="4GB")
client = Client(cluster)

In [None]:
client

In [4]:
dataset = DocumentDataset.read_json('/code/data/ready', add_filename=True)

In [5]:
check_partition_size(dataset)

In [None]:
dataset = divide_partition_manual(dataset)
dataset

In [6]:
dataset.df = dataset.df.repartition(npartitions=100)

In [None]:
check_partition_size(dataset)

In [8]:
dataset.df

Unnamed: 0,filename,text
0,hemostaza_edu.jsonl,Małopłytkowość u chorych na zespoły mielodyspl...
1,hemostaza_edu.jsonl,Profesor Magdalena Łętowska nową redaktor nacz...
2,hemostaza_edu.jsonl,Iptacopan otrzymał pozytywną opinię CHMP jako ...
3,hemostaza_edu.jsonl,Światowy Dzień Chorych na HemofilięCzym jest h...
4,hemostaza_edu.jsonl,Czy dodanie hydroksychlorochiny do empiryczneg...
...,...,...
882,mp_pl_podrecznik.jsonl,\n\n\n\n\nBadania płynu otrzewnowego (puchlino...
883,mp_pl_podrecznik.jsonl,\n\n\n\n\nDiagnostyka zakażeń bakteriami atypo...
884,mp_pl_podrecznik.jsonl,\n\n\n\n\nDiagnostyka zakażenia Helicobacter p...
885,mp_pl_podrecznik.jsonl,\n\n\n\n\nDiagnostyka zakażeń wirusowych\n\n\n...


In [None]:
def redact_pii(dataset: DocumentDataset) -> DocumentDataset:
    redactor = Modify(
        PiiModifier(
            language="en",
            supported_entities=["PERSON", "EMAIL_ADDRESS"],
            anonymize_action="replace",
            batch_size=2000,
        ),
    )
    return redactor(dataset)

redacted_dataset = redact_pii(dataset)

In [None]:
df = redacted_dataset.to_pandas()

In [None]:
client.shutdown()
cluster.close()

## Toy example

In [38]:
dataframe = pd.DataFrame(
        {"text": ["Sarah and Ryan went out to play", "Jensen is the CEO of NVIDIA", 
                  "Kacper jest super. Mój email to: kacper@gmail.com", "Weronika ma kota. charles@o2.pl"]}
    )
dd = dask.dataframe.from_pandas(dataframe, npartitions=1)
dataset = DocumentDataset(dd)
modified_dataset = redact_pii(dataset)

In [None]:
modified_dataset.df.head()