cell 1: Setup: Imports & configuration

In [2]:

import sys, subprocess, importlib

def _ensure(pkg, import_name=None, pip_name=None):
    name = import_name or pkg
    try:
        return importlib.import_module(name)
    except Exception:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", pip_name or pkg])
        return importlib.import_module(name)

yaml = _ensure("pyyaml", "yaml", "pyyaml")
np   = _ensure("numpy", "numpy", "numpy")
pd   = _ensure("pandas", "pandas", "pandas")
tqdm = _ensure("tqdm", "tqdm", "tqdm")
fim  = _ensure("pyfim", "fim", "pyfim==6.28")   # provides fpgrowth (fpmax is NOT exposed)

# --- configuration ---
import os, json, math, glob, itertools
from pathlib import Path
from collections import Counter, defaultdict
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from tqdm import tqdm

# Dataset location
DATA_DIR = Path("/Users/ankita/Desktop/V3_DatasetTest_Valid")
GLOB_PATTERN = "**/*.Pipeline"

# Mining thresholds
MIN_SUPPORT_RATIO = 0.10    # appear in >=20% of files
MIN_ITEMSET_LEN   = 1       # set 2 to skip singletons
MAX_ITEMSET_LEN   = 10

# Feature extraction knobs
LOWERCASE_VALUES        = True
MAX_VALUE_LEN           = 120
KEEP_NUMERIC_VALUES     = True
INCLUDE_EXISTS_FEATURES = True
INCLUDE_EQ_FEATURES     = True
WILDCARD = "[]"          # list wildcard

# Vocabulary pruning
MIN_DOC_FREQ       = 50      # MIN_DOC_FREQ - Keep only features that appear in at least this many documents
MAX_DOC_FREQ_RATIO = 0.98    # MAX_DOC_FREQ_RATIO - Drop features that appear in more than this fraction of documents
TOP_K_FEATURES     = None

# Verification/outputs
BATCH_VERIFY = 500
SAVE_DIR = Path("/Users/ankita/Desktop/Thesis-work/pattern_outputs2")
SAVE_DIR.mkdir(parents=True, exist_ok=True)

# Choose: "all" (frequent itemsets), "maximal" (post-filter), or "closed" (post-filter)
MINING_MODE = "all"   # options: "all" | "maximal" | "closed"


print("Setup complete. Using PyFIM fpgrowth; 'maximal' mode will post-filter to FP-Max.")


Setup complete. Using PyFIM fpgrowth; 'maximal' mode will post-filter to FP-Max.


cell 2: Uncomment & Run this cell - When re-running on new dataset to delete existing data files from previous runs so it doesnt interefere. 

In [4]:
import shutil, os
for p in (SAVE_DIR/"verified_patterns_summary.csv",
          SAVE_DIR/"verified_patterns_maximal.jsonl"):
    try: os.remove(p)
    except FileNotFoundError: pass
for f in SAVE_DIR.glob("verified_patterns_part_*.jsonl"):
    f.unlink()


Cell 3 — Discover files & sanity preview

In [6]:
# %% [markdown]
# ## Discover files & read

all_files = sorted(DATA_DIR.glob(GLOB_PATTERN))
print(f"Discovered .Pipeline files: {len(all_files):,}")
for p in all_files[:5]:
    print(">", p)

def peek_text(p: Path, n=20):
    try:
        with p.open("r", encoding="utf-8", errors="ignore") as f:
            for i, line in zip(range(n), f):
                print(line.rstrip())
    except Exception as e:
        print("read error:", e)

if all_files:
    print("\n--- Preview:", all_files[0], "---")
    peek_text(all_files[0], n=20)


Discovered .Pipeline files: 29,069
> /Users/ankita/Desktop/V3_DatasetTest_Valid/0-vortex_github-actions-dependent-jobs-example_contents_.github_workflows_deploy.Pipeline
> /Users/ankita/Desktop/V3_DatasetTest_Valid/0003c660988cf730c6feedcce9806ef3c4432cf0.Pipeline
> /Users/ankita/Desktop/V3_DatasetTest_Valid/00143f50002a05b8faad1fbb93ce3e0d85bde964.Pipeline
> /Users/ankita/Desktop/V3_DatasetTest_Valid/002e6e41de6f334b7fdd2714979028ea118b1ff1.Pipeline
> /Users/ankita/Desktop/V3_DatasetTest_Valid/006imran_event-dispatcher-workflows_contents_.github_workflows_push.Pipeline

--- Preview: /Users/ankita/Desktop/V3_DatasetTest_Valid/0-vortex_github-actions-dependent-jobs-example_contents_.github_workflows_deploy.Pipeline ---
name: Node CI/CD
on:
  push:
    branches: [main]
  pull_request:
jobs:
  build:
    runs-on: ubuntu-latest
    steps:
    - name: "Checkout repository"
      uses: actions/checkout@v2
    - name: "Setup Node"
      uses: actions/setup-node@v2
      with:
        node-ver

Cell 4 — YAML parsing + feature extraction (with list wildcards)

Feature scheme

exists:path for any mapping key / scalar path (e.g., exists:on.push, exists:jobs[].runs-on)

eq:path==value for scalar equality (e.g., eq:jobs[].runs-on==ubuntu-latest)

Lists become [] in the path to indicate any index (wildcard).

In [8]:
# Feature extraction without normalization
from pathlib import Path
from typing import Any, List, Set, Tuple
import itertools
import yaml
from tqdm import tqdm

# ------------ CONFIG ------------
INCLUDE_EXISTS_FEATURES = True
INCLUDE_EQ_FEATURES     = True

# Lists use a wildcard "[]" segment meaning "exists in ANY element"
WILDCARD = "[]"

# ----- YAML loader that avoids "on:" -> True -----
class NoBoolSafeLoader(yaml.SafeLoader):
    pass

for ch, mappings in list(NoBoolSafeLoader.yaml_implicit_resolvers.items()):
    new_mappings = []
    for tag, rx in mappings:
        if tag != 'tag:yaml.org,2002:bool':
            new_mappings.append((tag, rx))
    NoBoolSafeLoader.yaml_implicit_resolvers[ch] = new_mappings

def _yaml_load(text: str) -> Any:
    try:
        return yaml.load(text, Loader=NoBoolSafeLoader)
    except Exception:
        return None  # treat as unparsable

# ----- PURE DFS FEATURE EXTRACTION (NO NORMALIZATION) -----
def _walk(node: Any, prefix: List[str], out_exists: Set[str], out_eq: Set[str]):
    """
    DFS over YAML:
      - emits exists:path for every visited path
      - emits eq:path==value for scalar values (raw, no normalization)
    """
    if isinstance(node, dict):
        for k, v in node.items():
            key = str(k)  # no strip/lower/etc.
            p2 = prefix + [key]
            out_exists.add("exists:" + ".".join(p2))
            _walk(v, p2, out_exists, out_eq)

    elif isinstance(node, list):
        # list-level wildcard path
        p2 = prefix + [WILDCARD]
        out_exists.add("exists:" + ".".join(p2))
        for v in node:
            _walk(v, p2, out_exists, out_eq)

    else:
        # scalar value (string, int, float, bool, None, etc.)
        path = ".".join(prefix) if prefix else ""
        if path:
            out_exists.add("exists:" + path)
            # raw value as string (no length limit / lowercasing)
            val = str(node)
            out_eq.add(f"eq:{path}=={val}")

def extract_features(text: str) -> Tuple[Set[str], Set[str]]:
    data = _yaml_load(text)
    if data is None:
        return set(), set()
    ex, eq = set(), set()
    _walk(data, [], ex, eq)
    return ex, eq

def process_file(p: Path) -> Tuple[str, Set[str], bool]:
    # Runs in worker. Returns (filepath, feature_set, had_error_flag)
    try:
        txt = p.read_text(encoding="utf-8", errors="ignore")
    except Exception:
        return str(p), set(), True
    ex, eq = extract_features(txt)
    feats = set()
    if INCLUDE_EXISTS_FEATURES:
        feats |= ex
    if INCLUDE_EQ_FEATURES:
        feats |= eq
    return str(p), feats, False

# ---- runner that prefers processes and falls back to threads ----
def run_extraction(all_files: List[Path]):
    import os
    extraction: List[Tuple[str, Set[str]]] = []
    parse_errors = 0
    print(f"Extracting features from {len(all_files):,} files...")

    try:
        import multiprocessing as mp
        try:
            mp.set_start_method("fork", force=True)
        except RuntimeError:
            pass  # already set; OK
        N_WORKERS = max((os.cpu_count() or 2) - 1, 1)
        with mp.Pool(processes=N_WORKERS) as pool:
            for fn, feats, err in tqdm(
                pool.imap(process_file, all_files, chunksize=64),
                total=len(all_files), desc="Extracting"
            ):
                extraction.append((fn, feats))
                if err:
                    parse_errors += 1
        used = f"processes (n={N_WORKERS})"
    except Exception as e:
        from concurrent.futures import ThreadPoolExecutor, as_completed
        print(f"[warn] Process pool failed ({type(e).__name__}: {e}); falling back to threads.")
        max_workers = max(8, (os.cpu_count() or 8) * 4)
        with ThreadPoolExecutor(max_workers=max_workers) as exr:
            futs = {exr.submit(process_file, p): p for p in all_files}
            for fut in tqdm(as_completed(futs), total=len(futs), desc="Extracting"):
                fn, feats, err = fut.result()
                extraction.append((fn, feats))
                if err:
                    parse_errors += 1
        used = f"threads (n≈{max_workers})"

    # Diagnostics
    nonempty = [(fn, f) for fn, f in extraction if f]
    empty    = len(extraction) - len(nonempty)
    print(f"\nExtraction summary [{used}]:")
    print(f"  Total files       : {len(extraction):,}")
    print(f"  Non-empty feature : {len(nonempty):,}")
    print(f"  Empty feature     : {empty:,}")
    print(f"  Read/parse errors : {parse_errors:,}")

    print("\nExample features from first NON-empty file:")
    if nonempty:
        fn, feats = nonempty[0]
        print(">", Path(fn).name, "->", len(feats), "features | sample:",
              list(itertools.islice(sorted(feats), 8)))
    else:
        print("(!) All empty — check INCLUDE_* flags, DATA_DIR, and YAML validity.")

    return extraction

# ---- main entry point ----
if __name__ == "__main__":
    # Dataset location
    DATA_DIR = Path("/Users/ankita/Desktop/V3_DatasetTest_Valid")
    GLOB_PATTERN = "**/*.Pipeline"

    all_files = sorted(DATA_DIR.glob(GLOB_PATTERN))
    extraction = run_extraction(all_files)


Extracting features from 29,069 files...


Extracting: 100%|███████████████████████| 29069/29069 [00:12<00:00, 2418.19it/s]


Extraction summary [processes (n=7)]:
  Total files       : 29,069
  Non-empty feature : 29,069
  Empty feature     : 0
  Read/parse errors : 0

Example features from first NON-empty file:
> 0-vortex_github-actions-dependent-jobs-example_contents_.github_workflows_deploy.Pipeline -> 33 features | sample: ['eq:jobs.build.runs-on==ubuntu-latest', "eq:jobs.build.steps.[].if==github.ref == 'refs/heads/main' && github.event_name == 'push'", 'eq:jobs.build.steps.[].name==Checkout repository', 'eq:jobs.build.steps.[].name==Install dependencies', 'eq:jobs.build.steps.[].name==Install npm@7', 'eq:jobs.build.steps.[].name==Release', 'eq:jobs.build.steps.[].name==Setup Node', 'eq:jobs.build.steps.[].run==git config --global user.name test2\ngit config --global user.email "test@users.noreply.github.com"\nnpm run release\ngit status\n']





In [9]:

# Emits two feature types:
#   - exists:path
#   - eq:path==value
# We use a YAML loader that avoids "on:" -> True coercion.

from pathlib import Path
from typing import Any, List, Set, Tuple, Optional
import itertools
import yaml
from tqdm import tqdm

# ------------ CONFIG (must be True, or you'll get 0 features) ------------
INCLUDE_EXISTS_FEATURES = True
INCLUDE_EQ_FEATURES     = True

# Value normalization knobs
KEEP_NUMERIC_VALUES = False
LOWERCASE_VALUES    = True
MAX_VALUE_LEN       = 80

# Lists use a wildcard "[]" segment meaning "exists in ANY element"
WILDCARD = "[]"
# -------------------------------------------------------------------------

# YAML loader that prevents YAML 1.1 bool coercion (so "on:" won't become True)
class NoBoolSafeLoader(yaml.SafeLoader):
    pass

for ch, mappings in list(NoBoolSafeLoader.yaml_implicit_resolvers.items()):
    NoBoolSafeSafe = []
    for tag, rx in mappings:
        if tag != 'tag:yaml.org,2002:bool':
            NoBoolSafeSafe.append((tag, rx))
    NoBoolSafeLoader.yaml_implicit_resolvers[ch] = NoBoolSafeSafe

def _yaml_load(text: str) -> Any:
    try:
        return yaml.load(text, Loader=NoBoolSafeLoader)
    except Exception:
        return None  # treat as unparsable

def _norm_key(k: Any) -> str:
    return k.strip() if isinstance(k, str) else str(k)

def _norm_value(v: Any) -> Optional[str]:
    if v is None:
        return "null"
    if isinstance(v, bool):
        return "true" if v else "false"
    if isinstance(v, (int, float)) and KEEP_NUMERIC_VALUES:
        return str(v)
    if isinstance(v, str):
        s = v.strip()
        if LOWERCASE_VALUES:
            s = s.lower()
        if 0 < len(s) <= MAX_VALUE_LEN:
            return s
        return None
    return None

def _walk(node: Any, prefix: List[str], out_exists: Set[str], out_eq: Set[str]):
    # DFS over YAML; emit exists:* for every path and eq:* for scalars with kept values
    if isinstance(node, dict):
        for k, v in node.items():
            key = _norm_key(k)
            p2 = prefix + [key]
            out_exists.add("exists:" + ".".join(p2))
            _walk(v, p2, out_exists, out_eq)
    elif isinstance(node, list):
        p2 = prefix + [WILDCARD]
        out_exists.add("exists:" + ".".join(p2))
        for v in node:
            _walk(v, p2, out_exists, out_eq)
    else:
        path = ".".join(prefix) if prefix else ""
        if path:
            out_exists.add("exists:" + path)
        val = _norm_value(node)
        if path and val is not None:
            out_eq.add(f"eq:{path}=={val}")

def extract_features(text: str) -> Tuple[Set[str], Set[str]]:
    data = _yaml_load(text)
    if data is None:
        return set(), set()
    ex, eq = set(), set()
    _walk(data, [], ex, eq)
    return ex, eq

def process_file(p: Path) -> Tuple[str, Set[str], bool]:
    # Runs in worker. Returns (filepath, feature_set, had_error_flag)
    try:
        txt = p.read_text(encoding="utf-8", errors="ignore")
    except Exception:
        return str(p), set(), True
    ex, eq = extract_features(txt)
    feats = set()
    if INCLUDE_EXISTS_FEATURES:
        feats |= ex
    if INCLUDE_EQ_FEATURES:
        feats |= eq
    return str(p), feats, False

# ---- runner that prefers processes (fast) and falls back to threads if pickling fails
def run_extraction(all_files: List[Path]):
    import os
    extraction: List[Tuple[str, Set[str]]] = []
    parse_errors = 0
    print(f"Extracting features from {len(all_files):,} files...")

    # Try processes with a start method that avoids spawn pickle issues
    try:
        import multiprocessing as mp
        # 'fork' avoids pickling the function on Unix/macOS; if not available, stays default
        try:
            mp.set_start_method("fork", force=True)
        except RuntimeError:
            pass  # already set; OK
        N_WORKERS = max((os.cpu_count() or 2) - 1, 1)
        with mp.Pool(processes=N_WORKERS) as pool:
            for fn, feats, err in tqdm(
                pool.imap(process_file, all_files, chunksize=64),
                total=len(all_files), desc="Extracting"
            ):
                extraction.append((fn, feats))
                if err:
                    parse_errors += 1
        used = f"processes (n={N_WORKERS})"
    except Exception as e:
        # If anything about Pool/pickling fails, fallback to threads (I/O-bound is fine)
        from concurrent.futures import ThreadPoolExecutor, as_completed
        print(f"[warn] Process pool failed ({type(e).__name__}: {e}); falling back to threads.")
        max_workers = max(8, (os.cpu_count() or 8) * 4)
        with ThreadPoolExecutor(max_workers=max_workers) as exr:
            futs = {exr.submit(process_file, p): p for p in all_files}
            for fut in tqdm(as_completed(futs), total=len(futs), desc="Extracting"):
                fn, feats, err = fut.result()
                extraction.append((fn, feats))
                if err:
                    parse_errors += 1
        used = f"threads (n≈{max_workers})"

    # Diagnostics
    nonempty = [(fn, f) for fn, f in extraction if f]
    empty    = len(extraction) - len(nonempty)
    print(f"\nExtraction summary [{used}]:")
    print(f"  Total files       : {len(extraction):,}")
    print(f"  Non-empty feature : {len(nonempty):,}")
    print(f"  Empty feature     : {empty:,}")
    print(f"  Read/parse errors : {parse_errors:,}")

    print("\nExample features from first NON-empty file:")
    if nonempty:
        fn, feats = nonempty[0]
        print("•", Path(fn).name, "→", len(feats), "features | sample:",
              list(itertools.islice(sorted(feats), 8)))
    else:
        print("(!) All empty — check INCLUDE_* flags, DATA_DIR, and YAML validity.")

    return extraction

# ---- main entry point ----
if __name__ == "__main__":
    try:
        all_files  # ensure it exists from the previous cell
    except NameError:
        raise RuntimeError("`all_files` is not defined. Run the cell that builds the file list first.")
    extraction = run_extraction(all_files)


Extracting features from 29,069 files...


Extracting: 100%|███████████████████████| 29069/29069 [00:12<00:00, 2366.50it/s]



Extraction summary [processes (n=7)]:
  Total files       : 29,069
  Non-empty feature : 29,069
  Empty feature     : 0
  Read/parse errors : 0

Example features from first NON-empty file:
• 0-vortex_github-actions-dependent-jobs-example_contents_.github_workflows_deploy.Pipeline → 32 features | sample: ['eq:jobs.build.runs-on==ubuntu-latest', "eq:jobs.build.steps.[].if==github.ref == 'refs/heads/main' && github.event_name == 'push'", 'eq:jobs.build.steps.[].name==checkout repository', 'eq:jobs.build.steps.[].name==install dependencies', 'eq:jobs.build.steps.[].name==install npm@7', 'eq:jobs.build.steps.[].name==release', 'eq:jobs.build.steps.[].name==setup node', 'eq:jobs.build.steps.[].run==npm ci\nnpm audit --production\nnpm test']


Cell 5 — Build transactions, prune vocabulary, boolean matrix

In [11]:

from collections import Counter
from pathlib import Path

N_DOCS = len(extraction)

# --- document-frequency of features ---
df_counter = Counter()
for _, feats in extraction:
    df_counter.update(set(feats))

min_df = int(MIN_DOC_FREQ)
max_df = int(MAX_DOC_FREQ_RATIO * max(1, N_DOCS))

# --- build vocab with pruning ---
vocab = [f for f, c in df_counter.items() if c >= min_df and c <= max_df]
if TOP_K_FEATURES:
    top = sorted(df_counter.items(), key=lambda x: x[1], reverse=True)
    keep = set(vocab)
    vocab = [f for f, _ in top if f in keep][:TOP_K_FEATURES]
vocab = sorted(vocab)
v_index = {f: i for i, f in enumerate(vocab)}
v_set = set(v_index)  # speed up membership checks

# --- build transactions (kept docs) ---
transactions = []
doc_index_to_file = []
for fn, feats in extraction:
    inter = [f for f in feats if f in v_set]
    if inter:
        transactions.append(inter)
        doc_index_to_file.append(fn)

# --- identify pruned files (and why) ---
# 1) No features extracted at all (likely parse/empty or extraction knobs filtered everything)
pruned_empty_features = sorted(fn for fn, feats in extraction if not feats)

# 2) Had features, but ALL were pruned away by vocab thresholds
pruned_after_vocab = sorted(
    fn for fn, feats in extraction
    if feats and not any((f in v_set) for f in feats)
)

# 3) Union: any reason
kept_set   = set(doc_index_to_file)
all_set    = set(fn for fn, _ in extraction)
pruned_all = sorted(all_set - kept_set)

assert set(pruned_all) == set(pruned_empty_features) | set(pruned_after_vocab)

# --- helper to save lists ---
def _write_list(path: Path, items):
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as f:
        for x in items:
            f.write(str(x) + "\n")

def _basenames(paths):
    # keep only filenames, useful when you just want the .Pipeline names
    return sorted({Path(p).name for p in paths}, key=str.lower)

# --- save outputs (paths + basenames) ---
_write_list(SAVE_DIR / "pruned_empty_features_paths.txt", pruned_empty_features)
_write_list(SAVE_DIR / "pruned_after_vocab_paths.txt",  pruned_after_vocab)
_write_list(SAVE_DIR / "pruned_all_paths.txt",          pruned_all)

_write_list(SAVE_DIR / "pruned_empty_features_names.txt", _basenames(pruned_empty_features))
_write_list(SAVE_DIR / "pruned_after_vocab_names.txt",  _basenames(pruned_after_vocab))
_write_list(SAVE_DIR / "pruned_all_names.txt",          _basenames(pruned_all))

# --- summary ---
print(f"Docs kept: {len(transactions):,}/{N_DOCS:,}")
print(f"Unique features (pruned): {len(vocab):,}")
avg_feats = round(np.mean([len(t) for t in transactions]) if transactions else 0, 2)
print("Avg features/doc:", avg_feats)

print("\nPruning breakdown (files):")
print(f"  • No extracted features         : {len(pruned_empty_features):,}  → {SAVE_DIR/'pruned_empty_features_names.txt'}")
print(f"  • All features pruned by vocab  : {len(pruned_after_vocab):,}  → {SAVE_DIR/'pruned_after_vocab_names.txt'}")
print(f"  • Total pruned (union)          : {len(pruned_all):,}  → {SAVE_DIR/'pruned_all_names.txt'}")

# Small peek
if pruned_all:
    print("\nExample pruned (first 5):")
    for p in pruned_all[:5]:
        print(" -", Path(p).name)


Docs kept: 29,065/29,069
Unique features (pruned): 2,529
Avg features/doc: 29.4

Pruning breakdown (files):
  • No extracted features         : 0  → /Users/ankita/Desktop/Thesis-work/pattern_outputs2/pruned_empty_features_names.txt
  • All features pruned by vocab  : 4  → /Users/ankita/Desktop/Thesis-work/pattern_outputs2/pruned_after_vocab_names.txt
  • Total pruned (union)          : 4  → /Users/ankita/Desktop/Thesis-work/pattern_outputs2/pruned_all_names.txt

Example pruned (first 5):
 - 609880ed07f994e8607e8283b3a868d027c9223b.Pipeline
 - a7d8b362ee149799766528f8516e0119a764b42b.Pipeline
 - hassio-addons_workflows_contents_.github_workflows_addon-deploy.Pipeline
 - rpc-org_CWE-094-test_contents_.github_workflows_comment_issue.Pipeline


Cell 6 — Mine frequent itemsets (FP-Growth / Closed / Maximal): (≥20% support, length ≥2)

1. How it works: We always mine all frequent itemsets with fpgrowth.

    If MINING_MODE == "maximal", we drop any itemset that has a larger frequent superset.

    If MINING_MODE == "closed", we drop an itemset only when a larger frequent superset has the same absolute support (so the subset carries no extra information).

    If MINING_MODE == "all", we return everything as mined.


2. when min_support_abs = 5814, that means

if we set MIN_SUPPORT_RATIO = 0.20 (20%). With 29,066 docs kept:

Multiply: 29,066 × 0.20 = 5,813.2

Take ceiling so we don’t under-count: ceil(5,813.2) = 5,814
So an itemset must occur in at least 5,814 files to be considered frequent. This only gets extremely common constructs. reduce min_support_retion to get detailed constructs i.e occurring in less files.


In [13]:
# Cell 6 — Mine frequent itemsets ("all" | "maximal" | "closed") without SciPy/TE

import math
import numpy as np
import pandas as pd
from mlxtend.frequent_patterns import fpgrowth  # this submodule works with numpy/pandas only


N_DOCS = len(transactions)
min_supp_abs = max(1, math.ceil(MIN_SUPPORT_RATIO * N_DOCS))
print(f"min_support_abs = {min_supp_abs} of {N_DOCS} docs (~{MIN_SUPPORT_RATIO:.0%})")

results_itemsets = []  # list[(frozenset(str), int)]

if N_DOCS == 0:
    print("No transactions to mine. Check feature extraction / pruning earlier.")
else:
    # --- One-hot encode WITHOUT TransactionEncoder (avoids SciPy import) ---
    # We build a dense boolean matrix shaped (docs x |vocab|).
    # This is straightforward and fast for ~tens of millions of booleans.
    V = len(vocab)
    arr = np.zeros((N_DOCS, V), dtype=bool)
    for i, T in enumerate(transactions):
        for f in T:
            j = v_index[f]
            arr[i, j] = True

    df_bool = pd.DataFrame(arr, columns=vocab)

    # --- Mine all frequent itemsets via FP-Growth (ratio threshold) ---
    freq = fpgrowth(
        df_bool,
        min_support=MIN_SUPPORT_RATIO,  # ratio (e.g., 0.20)
        use_colnames=True,
        max_len=MAX_ITEMSET_LEN
    )

    # Keep only itemsets of desired min length
    freq["itemset_len"] = freq["itemsets"].apply(len)
    freq = freq[freq["itemset_len"] >= MIN_ITEMSET_LEN].reset_index(drop=True)

    # Convert support ratio → absolute support
    freq["support_abs"] = (freq["support"] * N_DOCS).round().astype(int)

    # --- Optional post-filters ---
    mode = str(MINING_MODE).lower()
    if mode in ("maximal", "closed"):
        # Drop any set that has a frequent superset (maximal) or a superset with equal support (closed)
        itemsets = list(freq["itemsets"])
        supports = list(freq["support_abs"])
        lens = freq["itemset_len"].tolist()

        # Index by length for fewer subset checks
        by_len = {}
        for idx, L in enumerate(lens):
            by_len.setdefault(L, []).append(idx)

        lengths = sorted(by_len.keys())
        keep = [True] * len(itemsets)

        for pos, L in enumerate(lengths):
            longer_idxs = [j for L2 in lengths[pos + 1:] for j in by_len[L2]]
            for i in by_len[L]:
                if not keep[i]:
                    continue
                A = itemsets[i]
                Sa = supports[i]
                for j in longer_idxs:
                    if not keep[j]:
                        continue
                    B = itemsets[j]
                    if A.issubset(B):
                        if mode == "maximal":
                            keep[i] = False
                            break
                        else:  # "closed"
                            if Sa == supports[j]:
                                keep[i] = False
                                break

        freq = freq[keep].reset_index(drop=True)

    # Package output: (frozenset(items), absolute_support)
    results_itemsets = [
        (frozenset(s), int(supp))
        for s, supp in zip(freq["itemsets"], freq["support_abs"])
    ]

print(f"Mined itemsets (pre-verification): {len(results_itemsets):,}")
print("Sample:", results_itemsets[:5])


min_support_abs = 5813 of 29065 docs (~20%)
Mined itemsets (pre-verification): 242
Sample: [(frozenset({'exists:name'}), 25504), (frozenset({'exists:on.push'}), 9586), (frozenset({'exists:jobs.build'}), 7067), (frozenset({'exists:jobs.build.steps.[]'}), 7057), (frozenset({'exists:jobs.build.steps'}), 7057)]


In [14]:
# Show top few mined singletons and pairs to confirm the threshold is correct
singletons = [(s, c) for s, c in results_itemsets if len(s) == 1]
print("Top mined singletons:")
for iset, supp in sorted(singletons, key=lambda x: x[1], reverse=True)[:10]:
    print(f"{supp:6d}  {next(iter(iset))}")

pairs = [(s, c) for s, c in results_itemsets if len(s) == 2]
print("\nTop mined pairs:")
for iset, supp in sorted(pairs, key=lambda x: x[1], reverse=True)[:10]:
    print(f"{supp:6d}  {sorted(list(iset))}")


Top mined singletons:
 25504  exists:name
  9586  exists:on.push
  7604  exists:on.workflow_call
  7067  exists:jobs.build
  7057  exists:jobs.build.steps.[]
  7057  exists:jobs.build.steps
  7057  exists:jobs.build.runs-on
  7023  exists:on.workflow_call.inputs
  6890  exists:on.push.branches
  6808  exists:on.push.branches.[]

Top mined pairs:
  8541  ['exists:name', 'exists:on.push']
  7057  ['exists:jobs.build', 'exists:jobs.build.steps.[]']
  7057  ['exists:jobs.build.steps', 'exists:jobs.build.steps.[]']
  7057  ['exists:jobs.build', 'exists:jobs.build.steps']
  7057  ['exists:jobs.build.runs-on', 'exists:jobs.build.steps']
  7057  ['exists:jobs.build.runs-on', 'exists:jobs.build.steps.[]']
  7057  ['exists:jobs.build', 'exists:jobs.build.runs-on']
  7023  ['exists:on.workflow_call', 'exists:on.workflow_call.inputs']
  6940  ['exists:jobs.build', 'exists:name']
  6931  ['exists:jobs.build.steps.[]', 'exists:name']


In [15]:
# DIAGNOSTIC: top singletons and rough top-pair supports

from collections import Counter
import itertools

# 1) Singleton doc frequencies
singleton_df = Counter()
for T in transactions:
    singleton_df.update(set(T))

top_single = sorted(singleton_df.items(), key=lambda x: x[1], reverse=True)[:50]
print("Top 20 single features (doc frequency):")
for f, c in top_single[:20]:
    print(f"{c:6d}  {f}")
print(f"\nMax singleton support: {top_single[0][1]} / {len(transactions)}")

# 2) Rough top pairs among the top-K singletons (K=200)
K = 200
top_feats = [f for f,_ in top_single[:K]]
feat_to_idx = {f:i for i,f in enumerate(top_feats)}
pair_counts = Counter()

for T in transactions:
    present = [feat_to_idx[f] for f in T if f in feat_to_idx]
    present.sort()
    for i, j in itertools.combinations(present, 2):
        pair_counts[(i,j)] += 1

if pair_counts:
    ((i,j), cnt) = pair_counts.most_common(1)[0]
    print(f"\nTop pair count among top {K}: {cnt} / {len(transactions)}")
    print("Pair features:")
    print("  •", top_feats[i])
    print("  •", top_feats[j])
else:
    print("\nNo pairs among the top features (very unlikely).")


Top 20 single features (doc frequency):
 25504  exists:name
  9586  exists:on.push
  7604  exists:on.workflow_call
  7067  exists:jobs.build
  7057  exists:jobs.build.steps
  7057  exists:jobs.build.steps.[]
  7057  exists:jobs.build.runs-on
  7023  exists:on.workflow_call.inputs
  6890  exists:on.push.branches
  6808  exists:on.push.branches.[]
  6770  exists:jobs.build.steps.[].uses
  6540  exists:on.pull_request
  6538  exists:jobs.build.steps.[].run
  6367  exists:jobs.build.steps.[].name
  5720  eq:jobs.build.runs-on==ubuntu-latest
  5411  exists:jobs.build.steps.[].with
  4460  exists:on.pull_request.branches
  4425  exists:on.pull_request.branches.[]
  3999  exists:on.[]
  3942  eq:on.push.branches.[]==main

Max singleton support: 25504 / 29065

Top pair count among top 200: 8541 / 29065
Pair features:
  • exists:name
  • exists:on.push


Cell 7 — Deterministic verifier: structural matching with wildcards

Goal: For each candidate itemset (e.g., {eq:jobs[].runs-on==ubuntu-latest, exists:on.push}), verify on raw YAML that all items hold in the same file.

Matcher semantics

exists:a.b[].c → path a → b is a list → any element has key c (at any depth under that element, following the rest of the path).

eq:a.b[].runs-on==ubuntu-latest → at least one element in list b has runs-on equal to ubuntu-latest.

For non-list paths, exact path must exist.

In [17]:
# %% [markdown]
# ## Deterministic verifier
# A file matches a pattern iff EVERY feature in the itemset is satisfied on the parsed YAML.

def _path_parts(path: str) -> List[str]:
    return [p for p in path.split(".") if p]

def _match_exists(node: Any, parts: List[str]) -> bool:
    if not parts: return True
    head, *rest = parts
    if head == WILDCARD:
        return isinstance(node, list) and any(_match_exists(ch, rest) for ch in node)
    if isinstance(node, dict) and head in node:
        return _match_exists(node[head], rest)
    return False

def _match_eq(node: Any, parts: List[str], target: str) -> bool:
    if not parts:
        return _norm_value(node) == target
    head, *rest = parts
    if head == WILDCARD:
        return isinstance(node, list) and any(_match_eq(ch, rest, target) for ch in node)
    if isinstance(node, dict) and head in node:
        return _match_eq(node[head], rest, target)
    return False

def verify_item_on_doc(doc: Any, feature: str) -> bool:
    if doc is None: return False
    try:
        if feature.startswith("exists:"):
            return _match_exists(doc, _path_parts(feature[7:]))
        if feature.startswith("eq:"):
            lhs_rhs = feature[3:]
            if "==" not in lhs_rhs: return False
            path, val = lhs_rhs.split("==", 1)
            return _match_eq(doc, _path_parts(path), val)
        return False
    except Exception:
        return False

def verify_itemset_on_file(p: str, itemset: Iterable[str]) -> bool:
    try:
        doc = yaml.load(Path(p).read_text(encoding="utf-8", errors="ignore"), Loader=yaml.BaseLoader)
    except Exception:
        doc = None
    return all(verify_item_on_doc(doc, it) for it in itemset)

print("Verifier ready.")


Verifier ready.


Cell 8 — Run verification on mined candidates (batched, parallel), save results

This produces:

verified_patterns.jsonl → each line: {itemset, support_mined, support_verified, files_path}

Per-pattern filename lists (pattern_<idx>.txt) for your cross-validation with Java.


In [19]:

# ## Cell 8 — FAST verification via inverted index (no re-parsing YAML), save results, build summaries

from pathlib import Path
import json, math
import pandas as pd
from tqdm import tqdm
from collections import defaultdict

# ---------- helpers ----------
def _load_jsonl(p: Path):
    rows = []
    with p.open("r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                rows.append(json.loads(line))
    return rows

def _save_jsonl(rows, p: Path):
    with p.open("w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(r) + "\n")

# ---------- compute min_supp_abs ----------
if "min_supp_abs" not in globals() or min_supp_abs is None:
    _N = len(doc_index_to_file) if "doc_index_to_file" in globals() else 0
    min_supp_abs = max(1, math.ceil(MIN_SUPPORT_RATIO * _N))
print(f"[Cell 8] Using min_supp_abs = {min_supp_abs}")

# ---------- try reload first ----------
verified_all = None
SAVE_DIR.mkdir(parents=True, exist_ok=True)
part_files = sorted(SAVE_DIR.glob("verified_patterns_maximal_part_*.jsonl"))
final_file = SAVE_DIR / "verified_patterns.jsonl"

if part_files or final_file.exists():
    verified_all = []
    if part_files:
        for pf in part_files:
            verified_all.extend(_load_jsonl(pf))
        print(f"Reloaded {len(verified_all)} rows from {len(part_files)} part files.")
    else:
        verified_all = _load_jsonl(final_file)
        print(f"Reloaded {len(verified_all)} rows from {final_file.name}.")

# ---------- if nothing reloaded, run fast verification ----------
if not verified_all:
    if "results_itemsets" not in globals() or not results_itemsets:
        raise RuntimeError("No mined itemsets in memory. Re-run Cell 6 first.")

    # Map kept files -> their FULL feature sets from extraction (not pruned)
    kept_files = list(doc_index_to_file)                             # order aligned with transactions
    fn_to_idx = {fn: i for i, fn in enumerate(kept_files)}
    full_feats_per_file = [set() for _ in range(len(kept_files))]

    # Build once from Cell 4 extraction
    # extraction: list of (filename, feats_full)
    src = {fn: feats for (fn, feats) in extraction}
    for i, fn in enumerate(kept_files):
        full_feats_per_file[i] = src.get(fn, set())

    # Collect ALL features that appear in mined itemsets to minimize index size
    features_needed = set()
    for iset, _supp in results_itemsets:
        features_needed.update(iset)

    # Build posting lists only for needed features
    postings = defaultdict(set)   # feature -> set(indices)
    for idx, feats in enumerate(full_feats_per_file):
        # only index features we need
        for f in feats & features_needed:
            postings[f].add(idx)

    # Verify by intersection (no YAML reparse)
    print(f"Verifying {len(results_itemsets):,} itemsets over {len(kept_files):,} files via postings...")
    verified_all = []
    BATCH_VERIFY = max(100, min(1000, len(results_itemsets)))  # auto-batch size
    for i in tqdm(range(0, len(results_itemsets), BATCH_VERIFY), desc="Verifying (postings)"):
        chunk = results_itemsets[i:i+BATCH_VERIFY]
        batch_out = []
        for k, (iset, supp_mined) in enumerate(chunk):
            # intersect postings
            it = iter(iset)
            try:
                first = next(it)
            except StopIteration:
                # empty itemset shouldn't happen, guard anyway
                cand = set()
            else:
                cand = set(postings.get(first, set()))
                for f in it:
                    cand &= postings.get(f, set())
                    if not cand:
                        break

            # materialize filenames
            files = [kept_files[j] for j in sorted(cand)]
            batch_out.append({
                "idx": k,
                "global_idx": i + k,
                "itemset": sorted(list(iset)),
                "support_mined": int(supp_mined),
                "support_verified": len(files),
                "files": files
            })

        # save part
        part_path = SAVE_DIR / f"verified_patterns_part_{i//BATCH_VERIFY:03d}.jsonl"
        _save_jsonl(batch_out, part_path)
        verified_all.extend(batch_out)

    # save combined
    _save_jsonl(verified_all, final_file)
    print("Saved:", final_file)

# Ensure a global_idx for each row
for i, r in enumerate(verified_all):
    if "global_idx" not in r:
        r["global_idx"] = i

# ---------- strict summary (size ≥ MIN_ITEMSET_LEN, verified ≥ min_supp_abs) ----------
rows = []
for r in verified_all:
    if (
        isinstance(r, dict)
        and "itemset" in r
        and "support_verified" in r
        and r["support_verified"] is not None
        and len(r["itemset"]) >= MIN_ITEMSET_LEN
        and r["support_verified"] >= min_supp_abs
    ):
        rows.append({
            "pattern_gid": r.get("global_idx", r.get("idx", -1)),
            "itemset": " & ".join(r["itemset"]),
            "size": len(r["itemset"]),
            "support_mined": int(r.get("support_mined", 0)),
            "support_verified": int(r["support_verified"]),
        })

summary_cols = ["pattern_gid","itemset","size","support_mined","support_verified"]
summary_df = pd.DataFrame(rows, columns=summary_cols)

if not summary_df.empty:
    summary_df = summary_df.sort_values(
        ["support_verified","size","support_mined"], ascending=[False, False, False]
    ).reset_index(drop=True)
    display(summary_df.head(40))
    summary_csv = SAVE_DIR / "verified_patterns_summary.csv"
    summary_df.to_csv(summary_csv, index=False)
    print("Saved strict summary:", summary_csv)
    print(f"Patterns meeting criteria (size≥{MIN_ITEMSET_LEN}, verified≥{min_supp_abs}): {len(summary_df):,}")
else:
    print(f"No patterns met the strict thresholds (size≥{MIN_ITEMSET_LEN}, verified≥{min_supp_abs}).")
    # Fallback preview: size ≥ MIN_ITEMSET_LEN, verified ≥ 1
    fallback_rows = []
    for r in verified_all:
        if "support_verified" in r and len(r.get("itemset", [])) >= MIN_ITEMSET_LEN and r["support_verified"] >= 1:
            fallback_rows.append({
                "pattern_gid": r.get("global_idx", r.get("idx", -1)),
                "itemset": " & ".join(r["itemset"]),
                "size": len(r["itemset"]),
                "support_mined": int(r.get("support_mined", 0)),
                "support_verified": int(r["support_verified"]),
            })
    fb_df = pd.DataFrame(fallback_rows, columns=summary_cols)
    if not fb_df.empty:
        fb_df = fb_df.sort_values(
            ["support_verified","size","support_mined"], ascending=[False, False, False]
        ).reset_index(drop=True)
        print("\nFallback preview (size≥MIN_ITEMSET_LEN, verified≥1):")
        display(fb_df.head(40))
        fb_csv = SAVE_DIR / "verified_patterns_summary_fallback.csv"
        fb_df.to_csv(fb_csv, index=False)
        print("Saved fallback:", fb_csv)
    else:
        print("Even fallback is empty—consider lowering MIN_SUPPORT_RATIO or relaxing pruning.")

# ---------- helper: export filenames for any pattern by global id ----------
def export_pattern_filenames_by_gid(pattern_gid: int, out_name: str = None) -> Path:
    hit = None
    for r in verified_all:
        if r.get("global_idx") == pattern_gid:
            hit = r; break
    if hit is None:
        raise KeyError(f"pattern_gid {pattern_gid} not found")
    label = out_name or f"id_{pattern_gid:06d}"
    out_path = SAVE_DIR / f"pattern_{label}.txt"
    with out_path.open("w", encoding="utf-8") as f:
        for fn in hit["files"]:
            f.write(fn + "\n")
    print(f"Exported {len(hit['files'])} filenames → {out_path}")
    return out_path


[Cell 8] Using min_supp_abs = 5813
Verifying 242 itemsets over 29,065 files via postings...


Verifying (postings): 100%|███████████████████████| 1/1 [00:00<00:00,  1.23it/s]


Saved: /Users/ankita/Desktop/Thesis-work/pattern_outputs2/verified_patterns.jsonl


Unnamed: 0,pattern_gid,itemset,size,support_mined,support_verified
0,0,exists:name,1,25504,25504
1,1,exists:on.push,1,9586,9586
2,14,exists:name & exists:on.push,2,8541,8541
3,12,exists:on.workflow_call,1,7604,7604
4,2,exists:jobs.build,1,7067,7067
5,36,exists:jobs.build & exists:jobs.build.runs-on ...,4,7057,7057
6,22,exists:jobs.build & exists:jobs.build.steps & ...,3,7057,7057
7,30,exists:jobs.build.runs-on & exists:jobs.build....,3,7057,7057
8,31,exists:jobs.build & exists:jobs.build.runs-on ...,3,7057,7057
9,33,exists:jobs.build & exists:jobs.build.runs-on ...,3,7057,7057


Saved strict summary: /Users/ankita/Desktop/Thesis-work/pattern_outputs2/verified_patterns_summary.csv
Patterns meeting criteria (size≥1, verified≥5813): 242


Cell 9 — verification


In [80]:

# Select patterns and export *filenames only* (one .txt per pattern)
# Two ways to select:
# 1) By global ids (pattern_gid values from the summary)
# 2) By exact itemsets (list of lists of feature strings)

from pathlib import Path
from typing import Iterable, Tuple, List

# ---- OPTION 1: select by ids (EDIT THIS) ----
SELECTED_PATTERN_IDS = [92]   # eg. fprgowth all [232, 29] maximal[3] closed []

# ---- OPTION 2: select by exact itemsets (EDIT THIS) ----
# Example: [["eq:jobs[].runs-on==ubuntu-latest","eq:jobs[].steps[].uses==actions/checkout@v4"]]
SELECTED_ITEMSETS: List[List[str]] = [] # eg. ["exists:on.push.branches"]

# Build a lookup from global_idx -> verified row
by_gid = {r["global_idx"]: r for r in verified_all}

def _only_pipeline_filenames(paths: List[str]) -> List[str]:
    """Return unique, sorted base names ending with .Pipeline from a list of paths."""
    names = []
    for fn in paths:
        name = Path(fn).name
        if name.endswith(".Pipeline"):
            names.append(name)
    # unique + stable order
    return sorted(set(names), key=str.lower)

def _write_filelist(label: str, files: List[str]):
    out_path = SAVE_DIR / f"pattern_{label}.txt"
    just_names = _only_pipeline_filenames(files)
    with out_path.open("w", encoding="utf-8") as f:
        for name in just_names:
            f.write(name + "\n")
    return out_path, len(just_names)

exported = []

# Export by pattern ids
for gid in SELECTED_PATTERN_IDS:
    if gid not in by_gid:
        print(f"[warn] pattern_gid {gid} not found; skipping")
        continue
    label = f"id_{gid:06d}"
    pth, n = _write_filelist(label, by_gid[gid]["files"])
    exported.append((label, pth, n))

# Export by exact itemsets
def _norm_iset(iset: Iterable[str]) -> Tuple[str, ...]:
    return tuple(sorted(iset))

target_itemsets_norm = set(_norm_iset(x) for x in SELECTED_ITEMSETS)

for r in verified_all:
    if _norm_iset(r["itemset"]) in target_itemsets_norm:
        label = "iset_" + "_".join([str(abs(hash(x)) % 10**6) for x in r["itemset"]])
        pth, n = _write_filelist(label, r["files"])
        exported.append((label, pth, n))

if exported:
    print("Exported filename lists (filenames only):")
    for label, pth, n in exported:
        print(f"• {label}: {n} files → {pth}")
else:
    print("No patterns selected yet. Fill SELECTED_PATTERN_IDS or SELECTED_ITEMSETS and re-run this cell.")


Exported filename lists (filenames only):
• id_000092: 3015 files → /Users/ankita/Desktop/Thesis-work/Pattern-Mining/pattern_outputs2/pattern_id_000092.txt


Below Compare was successful for (/Input/sample_jobs_builds_runs-on_steps.pattern):
1. File 1: /Users/ankita/Desktop/Thesis-work/pattern_outputs/pattern_id_000029.txt
   File 2: /Users/ankita/Desktop/Thesis-work/Generic M2T metrics/Patternmatching_files_all_files_sample_confname_jobs_builds_runs-on_steps.txt

Below compare has differences for (/Input/sample_on_push_branches.pattern):
1. File 1: /Users/ankita/Desktop/Thesis-work/pattern_outputs/pattern_id_000233.txt
   File 2: /Users/ankita/Desktop/Thesis-work/Generic M2T metrics/Patternmatching_files_all_files_sample_on_push_branches.txt

In [82]:
# Compare filenames from given 2 files containing list of filenames
# Config — edit these two paths:
file1 = "/Users/ankita/Desktop/Thesis-work/Pattern-mining/pattern_outputs2/pattern_id_000092.txt"
file2 = "/Users/ankita/Desktop/Thesis-work/Generic M2T metrics/Patternmatching_files_all_files.txt"

# Normalization flags
CASE_SENSITIVE = False          # set True if you want exact case matching
REQUIRE_EXTENSION = ".Pipeline" # set None to allow all; else only keep names with this suffix

# Output folder (reuses SAVE_DIR if you already defined it; else uses current dir)
try:
    SAVE_DIR
except NameError:
    from pathlib import Path
    SAVE_DIR = Path(".")
SAVE_DIR.mkdir(parents=True, exist_ok=True)

from pathlib import Path

def _load_names(path, case_sensitive=False, require_ext=None):
    names = []
    path = Path(path)
    with path.open("r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            s = line.strip()
            if not s or s.startswith("#"):
                continue
            base = Path(s).name  # strip directories if present
            if require_ext and not base.endswith(require_ext):
                continue
            names.append(base if case_sensitive else base.lower())
    # unique + stable order for display
    uniq = sorted(set(names))
    return uniq

list1 = _load_names(file1, CASE_SENSITIVE, REQUIRE_EXTENSION)
list2 = _load_names(file2, CASE_SENSITIVE, REQUIRE_EXTENSION)

set1, set2 = set(list1), set(list2)

matches = sorted(set1 & set2)
only1  = sorted(set1 - set2)
only2  = sorted(set2 - set1)

# Counts
total1 = len(set1)
total2 = len(set2)
match_count = len(matches)
nonmatch_count = len(only1) + len(only2)

print("=== Filename comparison summary ===")
print(f"File 1: {file1}")
print(f"File 2: {file2}")
print(f"Case sensitive: {CASE_SENSITIVE}")
print(f"Require extension: {REQUIRE_EXTENSION if REQUIRE_EXTENSION else 'None'}\n")

print(f"Total unique in file1: {total1}")
print(f"Total unique in file2: {total2}")
print(f"Matching count        : {match_count}")
print(f"Non-matching total    : {nonmatch_count}")
print(f"  ├─ Only in file1    : {len(only1)}")
print(f"  └─ Only in file2    : {len(only2)}")

# Save results
out_matches = SAVE_DIR / "compare_matches.txt"
out_only1   = SAVE_DIR / "compare_only_in_file1.txt"
out_only2   = SAVE_DIR / "compare_only_in_file2.txt"

out_matches.write_text("\n".join(matches) + ("\n" if matches else ""), encoding="utf-8")
out_only1.write_text("\n".join(only1) + ("\n" if only1 else ""), encoding="utf-8")
out_only2.write_text("\n".join(only2) + ("\n" if only2 else ""), encoding="utf-8")

print("\nSaved:")
print(f"• Matches           → {out_matches}")
print(f"• Only in file1     → {out_only1}")
print(f"• Only in file2     → {out_only2}")

# Optional: show a quick peek of differences
def _peek(lst, n=10):
    return "\n".join(lst[:n]) if lst else "(none)"

print("\nTop matches (up to 10):")
print(_peek(matches))
print("\nTop only-in-file1 (up to 10):")
print(_peek(only1))
print("\nTop only-in-file2 (up to 10):")
print(_peek(only2))


=== Filename comparison summary ===
File 1: /Users/ankita/Desktop/Thesis-work/Pattern-mining/pattern_outputs2/pattern_id_000092.txt
File 2: /Users/ankita/Desktop/Thesis-work/Generic M2T metrics/Patternmatching_files_all_files.txt
Case sensitive: False
Require extension: .Pipeline

Total unique in file1: 3015
Total unique in file2: 3015
Matching count        : 3015
Non-matching total    : 0
  ├─ Only in file1    : 0
  └─ Only in file2    : 0

Saved:
• Matches           → /Users/ankita/Desktop/Thesis-work/Pattern-Mining/pattern_outputs2/compare_matches.txt
• Only in file1     → /Users/ankita/Desktop/Thesis-work/Pattern-Mining/pattern_outputs2/compare_only_in_file1.txt
• Only in file2     → /Users/ankita/Desktop/Thesis-work/Pattern-Mining/pattern_outputs2/compare_only_in_file2.txt

Top matches (up to 10):
0-vortex_github-actions-dependent-jobs-example_contents_.github_workflows_deploy.pipeline
00143f50002a05b8faad1fbb93ce3e0d85bde964.pipeline
00gxd14g_atomic-red-team-pandas_contents_.gith