In [None]:
from speakleash import Speakleash

sl = Speakleash("../../datasets")

In [None]:
data = sl.get("wolne_lektury_corpus").data
documents = 0

for doc in data:
    documents += 1

    print("---")
    print(f"Document number: {documents}")
    print(doc.title)
    print(doc[:40])

print(f"Total documents in wolne_lektury_corpus: {documents}")

In [None]:
import re
import json
import zstandard as zstd
from pathlib import Path

html_pattern = re.compile(r"<[^>]+>")
isbn_pattern = re.compile(r"ISBN[:\s]*[\dXx\-]+", re.IGNORECASE)

error_patterns = [
    re.compile(r"404\s+Not\s+Found", re.IGNORECASE),
    re.compile(r"503\s+Service\s+Unavailable", re.IGNORECASE),
    re.compile(r"nginx/[\d.]+"),
    re.compile(r"Error\s+\d+", re.IGNORECASE),
    re.compile(r"Access\s+Denied", re.IGNORECASE),
]


def is_low_quality_document(doc):
    if "text" not in doc or not doc["text"]:
        return True

    text = doc["text"].strip()

    if len(text) < 50:
        return True

    for pattern in error_patterns:
        if pattern.search(text):
            return True

    if "meta" in doc:
        meta = doc["meta"]

        if meta.get("quality") == "LOW":
            return True

        if "quality_ai" in meta:
            if meta["quality_ai"].get("LOW", 0) > 95:
                return True

        if meta.get("words", 0) < 10:
            return True

        if meta.get("verbs", 0) == 0 and meta.get("words", 0) > 0:
            return True

        if meta.get("lexical_density", 1.0) < 0.2:
            return True

    return False


def clean_text(text):
    if not isinstance(text, str):
        text = str(text)

    text = html_pattern.sub("", text)
    text = isbn_pattern.sub("", text)
    text = re.sub(r"\s+", " ", text).strip() # Whitespaces

    return text


def process(input_path, output_path, text_field="text"):
    input_file = Path(input_path)
    output_file = Path(output_path)

    processed_docs = 0
    skipped_docs = 0
    written_docs = 0

    with open(input_file, "rb") as ifh:
        dctx = zstd.ZstdDecompressor()

        with open(output_file, "w", encoding="utf-8") as ofh:
            with dctx.stream_reader(ifh) as reader:
                text_stream = reader.read(1024 * 1024)
                buffer = b""

                while text_stream:
                    buffer += text_stream
                    lines = buffer.split(b"\n")
                    buffer = lines[-1]

                    for line in lines[:-1]:
                        if line.strip():
                            processed_docs += 1

                            try:
                                doc = json.loads(line)

                                if is_low_quality_document(doc):
                                    skipped_docs += 1
                                    continue

                                if text_field in doc:
                                    doc[text_field] = clean_text(doc[text_field])
                                if "title" in doc:
                                    doc["title"] = clean_text(doc["title"])

                                ofh.write(json.dumps(doc, ensure_ascii=False) + "\n")
                                written_docs += 1

                                if processed_docs % 1000 == 0:
                                    print(
                                        f"Processed: {processed_docs} | Written: {written_docs} | Skipped: {skipped_docs}"
                                    )

                            except json.JSONDecodeError:
                                skipped_docs += 1
                                continue

                    text_stream = reader.read(1024 * 1024)
    print(f"Processing Complete!")


input_path = "../../datasets/wolne_lektury_corpus.jsonl.zst"
output_path_uncompressed = "../../datasets/wolne_lektury_corpus_cleaned.jsonl"
process(input_path, output_path_uncompressed)

Processed: 1000 | Written: 488 | Skipped: 512
Processing Complete!
Processing Complete!
