# core

> This module contains all the core functions used in the library.

In [None]:
#| default_exp core

In [None]:
#| export
import logging
import os

from datasets import concatenate_datasets, Dataset
from rich.logging import RichHandler

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(RichHandler(rich_tracebacks=True))
# Turn off logging for datasets
logging.getLogger("datasets").setLevel(logging.ERROR)

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| hide
from datasets import load_dataset
from squeakily.filter import check_char_repetition, check_flagged_words, minhash_dedup
from squeakily.clean import remove_empty_lines, normalize_whitespace

In [None]:
#| export
class Pipeline:
    """
    A pipeline is a collection of datasources and their associated transformations to be run.
    """
    def __init__(
        self,
        datasources # The datasources to be run
    ):
        self.datasources = datasources
    
    def run(
        self,
        global_filters=[], # Filters to be run at the dataset level rather than the example level
        global_cleaners=[], # Cleaners to be run at the dataset level rather than the example level
        cleaning_first=False, # Whether to run the cleaning transformations first
        globals_first=False, # Whether to run the global transformations first
    ):
        """
        Run the pipeline.
        """
        for datasource in self.datasources:
            dataset = datasource["dataset"]
            column = datasource["columns"][0]
            logger.info(f"Running datasource: {dataset.builder_name}")
            if cleaning_first:
                for c in datasource["cleaners"]:
                    logger.info(f"Running cleaner: {c.__name__} on {column}")
                    dataset = dataset.map(
                        lambda x: {column: c(x[column])},
                        num_proc=os.cpu_count(),
                    )
                for f in datasource["filters"]:
                    logger.info(f"Running filter: {f.__name__} on {column}")
                    dataset = dataset.filter(
                        lambda x: f(x[column]),
                        num_proc=os.cpu_count(),
                    )
            else:
                for f in datasource["filters"]:
                    logger.info(f"Running filter: {f.__name__} on {column}")
                    dataset = dataset.filter(
                        lambda x: f(x[column]),
                        num_proc=os.cpu_count(),
                    )
                for c in datasource["cleaners"]:
                    logger.info(f"Running cleaner: {c.__name__} on {column}")
                    dataset = dataset.map(
                        lambda x: {column: c(x[column])},
                        num_proc=os.cpu_count(),
                    )
        
        if len(global_filters) > 0:
            # concatenate all datasets
            datasets = [
                d["dataset"] for d in self.datasources
                if not d.get("skip_global", False)
            ]
            global_column = self.datasources[0]["columns"][0]
            global_dataset = concatenate_datasets(datasets)

            # Add a column representing the original dataset name
            md = []
            for d in datasets:
                md.extend([d.builder_name] * len(d))
            meta_data = Dataset.from_dict({"meta_data": md})
            global_dataset_with_meta = concatenate_datasets([global_dataset, meta_data], axis=1)

            # Run the global filters
            for f in global_filters:
                logger.info(f"Running global filter: {f.__name__}")
                global_dataset_with_meta = f(global_dataset_with_meta, global_column)

            # Split the dataset back up
            for i, dataset in enumerate(datasets):
                self.datasources[i]["dataset"] = global_dataset_with_meta.filter(
                    lambda x: x["meta_data"] == dataset.builder_name,
                    num_proc=os.cpu_count(),
                )

In [None]:
#|echo: true
show_doc(Pipeline.run)

---

[source](https://github.com/CarperAI/squeakily/blob/main/squeakily/core.py#L30){target="_blank" style="float:right; font-size:smaller"}

### Pipeline.run

>      Pipeline.run (global_filters=[], global_cleaners=[],
>                    cleaning_first=False, globals_first=False)

Run the pipeline.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| global_filters | list | [] | Filters to be run at the dataset level rather than the example level |
| global_cleaners | list | [] | Cleaners to be run at the dataset level rather than the example level |
| cleaning_first | bool | False | Whether to run the cleaning transformations first |
| globals_first | bool | False | Whether to run the global transformations first |

In [None]:
ds = load_dataset("wikitext", "wikitext-103-v1", split="train[:1%]")
logger.info(f"Original dataset size: {len(ds)}")
datasources = [
    {
        "dataset": ds,
        "columns": ["text"],
        "filters": [check_char_repetition, check_flagged_words],
        "cleaners": [remove_empty_lines, normalize_whitespace],
    },
    # ...
]

global_filters = [minhash_dedup]
pipeline = Pipeline(datasources)
pipeline.run(global_filters=global_filters)
logger.info(f"Final dataset size: {len(pipeline.datasources[0]['dataset'])}")

#0:   0%|          | 0/563 [00:00<?, ?ex/s]
#0: 100%|██████████| 563/563 [00:00<00:00, 49229.54ex/s]


#1: 100%|██████████| 563/563 [00:00<00:00, 42426.84ex/s]



[A[A[A



#2: 100%|██████████| 563/563 [00:00<00:00, 31199.87ex/s]
#3: 100%|██████████| 563/563 [00:00<00:00, 37671.99ex/s]





#4: 100%|██████████| 563/563 [00:00<00:00, 30492.41ex/s][A[A[A[A[A[A







#6: 100%|██████████| 563/563 [00:00<00:00, 43463.89ex/s]







#7: 100%|██████████| 563/563 [00:00<00:00, 46253.76ex/s]








#8: 100%|██████████| 563/563 [00:00<00:00, 48583.34ex/s]









[A[A[A[A[A[A[A[A[A









#10: 100%|██████████| 563/563 [00:00<00:00, 47394.69ex/s]
#9: 100%|██████████| 563/563 [00:00<00:00, 25732.49ex/s]











#11: 100%|██████████| 563/563 [00:00<00:00, 30829.20ex/s]












#12: 100%|██████████| 563/563 [00:00<00:00, 38979.10ex/s][A[A[A[A[A[A[A[A[A[A[A[A[A













#13: 100%|██████████| 563/563 [00:00<00:00, 44119.22ex/s]














[A[A[A[A

Indexing signatures...: 100%|██████████| 18014/18014 [00:01<00:00, 16042.04it/s]


Constructing graph...: 100%|██████████| 7757/7757 [00:12<00:00, 637.20it/s]
Iterating over components...: 100%|██████████| 10560/10560 [00:00<00:00, 2971610.21it/s]


In [None]:
# test the ability to skip global filters

ds_1 = load_dataset("wikitext", "wikitext-103-v1", split="train[:1%]")


datasources = [
    {
        "dataset": ds,
        "columns": ["text"],
        "filters": [check_char_repetition, check_flagged_words],
        "cleaners": [remove_empty_lines, normalize_whitespace],
        "skip_global": False,
    },
    {
        "dataset": ds_1,
        "columns": ["text"],
        "filters": [check_char_repetition, check_flagged_words],
        "cleaners": [remove_empty_lines, normalize_whitespace],
        "skip_global": True,
    },
    # ...
]
pipeline = Pipeline(datasources)
pipeline.run(global_filters=global_filters)
logger.info(f"Final dataset size: {len(pipeline.datasources[0]['dataset'])}")

Indexing signatures...: 100%|██████████| 18014/18014 [00:01<00:00, 16967.57it/s]


Constructing graph...: 100%|██████████| 7757/7757 [00:12<00:00, 609.50it/s]
Iterating over components...: 100%|██████████| 10560/10560 [00:00<00:00, 2443149.11it/s]


In [None]:
logger.info(f"Final dataset size: {len(pipeline.datasources[0]['dataset'])}")

In [None]:
logger.info(f"Final dataset size: {len(pipeline.datasources[1]['dataset'])}")

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()