# Detect text reuse between DNZ and MEGA texts using Passim

- **Input**: DNZ texts from `texts/DNZ_texts/` and MEGA texts from `texts/MEGA_texts/`
- **Output**: `passim_output`


In [44]:
import json
import os
import re
import shutil
import subprocess
from pathlib import Path
from typing import Any, Dict, Iterator, Optional

import ftfy
import orjsonl
from tqdm import tqdm

# Set working directory to project root
project_root = Path.cwd()
print(f"Working directory: {project_root}")

Working directory: /Users/ht8933/Documents/dev/remarx


## 1. Convert `text` files to `Passim`-friendly `JSONL` format


### Step 1: Convert `text` files to raw `JSONL` format

In [45]:
def convert_text_to_jsonl(text_dir, output_file, series_name):
    text_dir = Path(text_dir)
    output_file = Path(output_file)
    count = 0
    with open(output_file, "w", encoding="utf-8") as f:
        for txt_file in text_dir.glob("*.txt"):
            try:
                with open(txt_file, "r", encoding="utf-8") as txt_f:
                    content = txt_f.read().strip()
                doc = {"id": txt_file.stem, "text": content, "series": series_name}
                f.write(json.dumps(doc, ensure_ascii=False) + "\n")
                count += 1
            except Exception as e:
                print(f"Error processing {txt_file}: {e}")
    print(f"Output written to: {output_file.resolve()}")
    return count


# DNZ / MEGA original JSONL
texts_json_for_passim_dir = Path("texts_json_for_passim")
if texts_json_for_passim_dir.exists():
    shutil.rmtree(texts_json_for_passim_dir)
texts_json_for_passim_dir.mkdir(exist_ok=True)

convert_text_to_jsonl(
    "texts/DNZ_texts", texts_json_for_passim_dir / "dnz_texts.jsonl", "dnz"
)
convert_text_to_jsonl(
    "texts/MEGA_texts", texts_json_for_passim_dir / "mega_texts.jsonl", "mega"
)

print("DNZ/MEGA texts have been converted to JSONL and saved in texts_json_for_passim/")
print(f"All JSONL files are located in: {texts_json_for_passim_dir.resolve()}")

Output written to: /Users/ht8933/Documents/dev/remarx/texts_json_for_passim/dnz_texts.jsonl
Output written to: /Users/ht8933/Documents/dev/remarx/texts_json_for_passim/mega_texts.jsonl
DNZ/MEGA texts have been converted to JSONL and saved in texts_json_for_passim/
All JSONL files are located in: /Users/ht8933/Documents/dev/remarx/texts_json_for_passim


### Step 2: Convert raw `JSONL` to Passim-friendly `JSONL` using the code adapted from `corppa`

In [46]:
def clean_text(text: str) -> str:
    """Clean text to make finding matches easier."""
    result_text = ftfy.fix_text(
        text,
        unescape_html=False,
        fix_encoding=False,
        normalization="NFKC",
    )
    result_text = re.sub(r"\s+", " ", result_text)
    return result_text


def transform_record(
    record: Dict[str, Any],
    corpus_name: str,
    id_field: str = "id",
    preserve_fields: bool = False,
    corpus_from_field: Optional[str] = None,
) -> Dict[str, Any]:
    """
    Convert one record to passim-friendly dict:
      - id: taken from record[id_field]
      - corpus: either a fixed corpus_name, or record[corpus_from_field] if provided
      - text: cleaned text (missing -> "")
    """
    if id_field not in record:
        raise ValueError(f"Record missing required id_field '{id_field}'")

    out_record: Dict[str, Any] = {}
    if preserve_fields:
        if id_field != "id" and "id" in record:
            raise ValueError("Record already has 'id' while id_field != 'id'")
        out_record.update(record)

    out_record["id"] = record[id_field]
    if corpus_from_field:
        if corpus_from_field not in record:
            raise ValueError(f"Record missing corpus_from_field '{corpus_from_field}'")
        out_record["corpus"] = record[corpus_from_field]
    else:
        out_record["corpus"] = corpus_name

    out_record["text"] = clean_text(record.get("text", ""))
    return out_record


def build_passim_corpus(
    input_corpus: Path,
    corpus_name: str,
    id_field: str = "id",
    preserve_fields: bool = False,
    corpus_from_field: Optional[str] = None,
    show_progress: bool = True,
) -> Iterator[Dict[str, Any]]:
    """Generator over transformed records"""
    for record in tqdm(
        orjsonl.stream(input_corpus),
        desc="Transforming records",
        disable=not show_progress,
    ):
        yield transform_record(
            record,
            corpus_name=corpus_name,
            id_field=id_field,
            preserve_fields=preserve_fields,
            corpus_from_field=corpus_from_field,
        )


def save_passim_corpus(
    input_corpus: Path,
    output_corpus: Path,
    corpus_name: str,
    id_field: str = "id",
    preserve_fields: bool = True,
    corpus_from_field: Optional[str] = None,
    show_progress: bool = True,
) -> None:
    """Transform and write to JSONL”"""
    output_corpus.parent.mkdir(parents=True, exist_ok=True)
    orjsonl.save(
        output_corpus,
        build_passim_corpus(
            input_corpus=input_corpus,
            corpus_name=corpus_name,
            id_field=id_field,
            preserve_fields=preserve_fields,
            corpus_from_field=corpus_from_field,
            show_progress=show_progress,
        ),
    )


dnz_jsonl = texts_json_for_passim_dir / "dnz_texts.jsonl"
mega_jsonl = texts_json_for_passim_dir / "mega_texts.jsonl"
dnz_passim = texts_json_for_passim_dir / "dnz_passim.jsonl"
mega_passim = texts_json_for_passim_dir / "mega_passim.jsonl"

save_passim_corpus(
    input_corpus=dnz_jsonl,
    output_corpus=dnz_passim,
    corpus_name="ignored_when_using_series",
    id_field="id",
    preserve_fields=True,
    corpus_from_field="series",  # use record['series'] to label corpus = dnz
    show_progress=True,
)

save_passim_corpus(
    input_corpus=mega_jsonl,
    output_corpus=mega_passim,
    corpus_name="ignored_when_using_series",
    id_field="id",
    preserve_fields=True,
    corpus_from_field="series",  # use record['series'] to label corpus = mega
    show_progress=True,
)

Transforming records: 24it [00:14,  1.70it/s]
Transforming records: 5it [00:00, 360.95it/s]


### Step 3: Combine the two passim input files into one

In [47]:
combined_passim_input = texts_json_for_passim_dir / "combined_passim_input.jsonl"
with open(combined_passim_input, "w", encoding="utf-8") as outf:
    for p in (dnz_passim, mega_passim):
        with open(p, "r", encoding="utf-8") as inf:
            outf.write(inf.read())

print(f"Combined Passim input created: {combined_passim_input}")
print(f"File size: {combined_passim_input.stat().st_size / 1024 / 1024:.2f} MB")

Combined Passim input created: texts_json_for_passim/combined_passim_input.jsonl
File size: 72.05 MB


## 2. Run `Passim` on converted `JSONL` corpus

### Step 1: In order to run `Passim`, we need to first configure `java_home` environment variable

In [48]:
# change this to your own java_home
java_home = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"

# If you do not know your `java_home` path, run the following helper function below (if may not work)

In [49]:
# Helper function to find JAVA_HOME


def find_java_home(prefer_version: str = "17") -> str | None:
    try:
        out = subprocess.check_output(
            ["/usr/libexec/java_home", "-v", prefer_version], text=True
        ).strip()
        if out and Path(out, "bin/java").exists():
            return out
    except Exception:
        pass

    # 2) 常见安装路径（Homebrew / Temurin / Zulu）
    candidates = [
        "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home",  # brew (Apple Silicon)
        "/usr/local/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home",  # brew (Intel)
        "/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home",  # Temurin
        "/Library/Java/JavaVirtualMachines/zulu-17.jdk/Contents/Home",  # Azul Zulu
    ]
    for p in candidates:
        if Path(p, "bin/java").exists():
            return p

    return None


java_home = find_java_home("17")
if not java_home:
    raise RuntimeError("Failed to find Java 17 installation.\n")

# set java_home to environment
os.environ["JAVA_HOME"] = java_home
os.environ["PATH"] = f"{Path(java_home) / 'bin'}:{os.environ.get('PATH', '')}"

# log java version
ver = subprocess.check_output(
    [Path(java_home, "bin/java"), "-version"], stderr=subprocess.STDOUT, text=True
)
print("JAVA_HOME =", java_home)
print(ver.splitlines()[0])

JAVA_HOME = /opt/homebrew/Cellar/openjdk@17/17.0.16/libexec/openjdk.jdk/Contents/Home
openjdk version "17.0.16" 2025-07-15


### Step 2: Configure other stuff for running `Passim`

### Step 3: Run `Passim` on converted corpus

In [None]:
combined_input = Path(
    "/Users/ht8933/Documents/dev/remarx/texts_json_for_passim/combined_passim_input.jsonl"
)
output_dir = Path("/Users/ht8933/Documents/dev/remarx/passim_output")
if output_dir.exists():
    shutil.rmtree(output_dir)

# Remove the output directory if it exists
if output_dir.exists():
    shutil.rmtree(output_dir)

cmd_default = ["passim", str(combined_input), str(output_dir)]

print("🚀 Re-running Passim with default parameters:")
print("=" * 50)
print(f"Command: {' '.join(cmd_default)}")
print(f"Input: {combined_input}")
print(f"Output: {output_dir}")
print("\n⏱️ Starting... (this may take a few minutes)")

# Set environment variables
env = os.environ.copy()
env.update(
    {
        "SPARK_LOCAL_IP": "127.0.0.1",
        "PYSPARK_DRIVER_PYTHON": "python3",  # use system python, not conda python
        "PYSPARK_PYTHON": "python3",
        "SPARK_DRIVER_MEMORY": "8g",
        "SPARK_EXECUTOR_MEMORY": "8g",
    }
)

try:
    result = subprocess.run(
        cmd_default, cwd=project_root, env=env, capture_output=True, text=True
    )

    if result.returncode == 0:
        print("✅ Passim completed successfully!")
        if result.stdout:
            print("📝 Output:", result.stdout[-500:])  # Last 500 characters
        success_default = True
    else:
        print(f"❌ Passim failed, return code: {result.returncode}")
        print("Error message:", result.stderr)
        if result.stdout:
            print("Standard output:", result.stdout)
        success_default = False

except Exception as e:
    print(f"❌ Error running Passim: {e}")
    success_default = False

if success_default:
    print(f"\n Success! Now check the {output_dir} directory")
else:
    print("\n Failed.")

🚀 使用默认参数重新运行 Passim:
命令: passim /Users/ht8933/Documents/dev/remarx/texts_json_for_passim/combined_passim_input.jsonl passim_output
输入: /Users/ht8933/Documents/dev/remarx/texts_json_for_passim/combined_passim_input.jsonl
输出: passim_output

⏱️ 开始运行... (这可能需要几分钟)


KeyboardInterrupt: 