In [None]:
import os
import subprocess

import click
import datasets

from text_dedup.minhash import main as minhash_main
from text_dedup.utils import IOArgs
from text_dedup.utils import MetaArgs
from text_dedup.utils import MinHashArgs
from text_dedup.utils.timer import Timer

output_path_ds = "temp_input_ds"
output_path_spark = "temp_input_spark"

dataset = datasets.load_dataset("oscar-corpus/OSCAR-2201", "gl", split="train", trust_remote_code=True)
dataset.save_to_disk(output_path_ds)

os.makedirs(output_path_spark, exist_ok=True)
dataset.to_pandas().to_parquet(output_path_spark + "/data.parquet")
NUM_PROC = 4
column = "text"
num_perm = 250
ngram = 5
min_length = 0
threshold = 0.7
t = Timer()

io_args = IOArgs(
    path=output_path_ds,
    local=True,
    num_proc=NUM_PROC,
    cache_dir=".cache",
    output="./output_minhash",
    debug=True,
    clean_cache=True,
)
meta_args = MetaArgs(column=column, batch_size=10000)

with t("MinHash"):
    ctx = click.Context(minhash_main)
    minhash_args = MinHashArgs(num_perm=num_perm, ngram=ngram, min_length=min_length, threshold=threshold)
    io_args.output = minhash_output = "./output_minhash"
    ctx.invoke(
        minhash_main,
        io_args=io_args,
        meta_args=meta_args,
        minhash_args=minhash_args,
    )

    with t("MinHash Spark"):
        spark_output = "./output_spark"
        spark_args = f"""
        spark-submit --executor-memory 8g
            --driver-memory 8g
            --executor-cores 2
            --num-executors 2
            --packages graphframes:graphframes:0.8.2-spark3.2-s_2.12
            --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=../log4j.properties
            --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=../log4j.properties
            ../text_dedup/minhash_spark.py
            --input ./{output_path_spark}
            --output {spark_output}
            --column {column}
            --threshold {threshold}
            --min_length {min_length}
            --num_perm {num_perm}
            --ngram {ngram}
            --debug
        """.split("\n")
        subprocess.run(
            [part.strip() for line in spark_args for part in line.strip().split(" ") if part.strip()],
        )  # nosec

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


Saving the dataset (0/1 shards):   0%|          | 0/88803 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/88803 [00:00<?, ? examples/s]

Filter (num_proc=4):   0%|          | 0/88803 [00:00<?, ? examples/s]

Fingerprinting... (num_proc=4):   0%|          | 0/88803 [00:00<?, ? examples/s]

Iterating MinHashes...: 100%|██████████| 9/9 [00:02<00:00,  3.08it/s]


Clustering...: 100%|██████████| 25/25 [00:01<00:00, 23.55it/s]


Finding clusters... (num_proc=4):   0%|          | 0/88803 [00:00<?, ? examples/s]

Filtering clusters... (num_proc=4):   0%|          | 0/88803 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/44099 [00:00<?, ? examples/s]

:: loading settings :: url = jar:file:/Users/chenghao/miniforge3/envs/dedup/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/chenghao/.ivy2/cache
The jars for the packages stored in: /Users/chenghao/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-481f3b0b-867d-4704-a87e-be5699d67bbd;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.2-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 94ms :: artifacts dl 3ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.2-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------------------

DEBUG __main__ - ------------------------------------------------------------------------------------------------------------------------
DEBUG __main__ - Using B=25, R=10
DEBUG __main__ - Loaded documents: 88803
DEBUG __main__ - args.input='./temp_input_spark'
DEBUG __main__ - args.output='./output_spark'
DEBUG __main__ - args.threshold=0.7
DEBUG __main__ - args.ngram_size=5
DEBUG __main__ - args.min_length=0
DEBUG __main__ - args.num_perm=250
DEBUG __main__ - args.column='text'
DEBUG __main__ - id                                                              : bigint
DEBUG __main__ - text                                                            : string
DEBUG __main__ - meta                                                            : struct<annotations:array<string>,identification:struct<label:string,prob:double>,line_identifications:array<struct<label:string,prob:double>>,warc_headers:struct<content-length:bigint,content-type:string,warc-block-digest:string,warc-date:string,warc-i