In [1]:
import re
import pandas as pd
import numpy as np
import spacy
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.metrics.pairwise import cosine_similarity
import hdbscan
from tqdm import tqdm
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [2]:
experiential_terms = {
    'emotional': [
        'felt','feeling','emotion','joy','fear','anxiety','bliss','love','terror','peace','calm',
        'excited','overwhelmed','gratitude','euphoria','sadness','longing','crying','ecstasy','relief',
        'compassion','grief','awe','anger','release','hope','despair','serenity','agitation','comfort',
        'purging','vulnerability','intimacy','empathy','tension','melancholy','abandon','appreciation'
    ],
    'sensory': [
        'visual','hear','sound','color','bright','pattern','geometry','music','taste','smell','see',
        'saw','colors','sounds','shapes','textures','movement','melting','vibrations','pulsing',
        'fractal','echo','flashing','tunnel','fluid','shimmering','sparkling','synesthesia',
        'auditory','trails','glow','hallucination','pulsate','distortion','radiance','static',
        'blurred','lightness','glimmer','resonance','tactile','kaleidoscopic'
    ],
    'cognitive': [
        'thought','mind','consciousness','aware','realize','understand','insight','clarity','confused',
        'clear','thinking','perception','concepts','identity','ego','dissolve','looping','logic',
        'recognition','belief','interpretation','memory','language','narrative','meaning','mindspace',
        'headspace','overthinking','mental','clarification','self-talk','rational','intellect',
        'philosophical','metacognition','rumination','stream of consciousness','inner dialogue',
        'cognitive dissonance','hyperfocus'
    ],
    'physical': [
        'body','skin','breath','heart','energy','vibration','tingling','warm','heavy','light','pressure',
        'sensation','nausea','shaking','sweating','floating','stillness','tightness','spasm','motion',
        'trembling','cold','breathing','heartbeat','twitching','dry mouth','muscles','stiffness',
        'paralysis','numbness','restlessness','chills','sweat','clenching','somatic','bodyload',
        'temperature','digestive','physical release'
    ],
    'mystical': [
        'ego','self','unity','divine','spiritual','transcend','infinite','oneness','god','universe',
        'connected','sacred','eternal','death','rebirth','timeless','interconnected','presence','source',
        'void','light','beyond','higher power','awakening','realm','dimension','truth','immortality',
        'ego death','no-self','nirvana','cosmic','transcendence','pure being','karma','light being',
        'soul','heaven','angelic','time distortion','godlike','divinity','portal','third eye',
        'nondual','dissolution','samsara','infinity','entity','timelessness'
    ],
    'temporal': [
        'onset','peak','comedown','duration','timeline','hours','minutes','start','beginning',
        'after','later','build-up','before','end','wave','early','gradual','suddenly',
        'phase','stage','passed','elapsed','over time','rush','fade','linger','moment',
        'slowly','time passed','time distorted','hour mark','entry','exit'
    ]
}

# Flatten to lowercase set
EXP_TERMS_FLAT = {word.lower() for words in experiential_terms.values() for word in words}

# Load SpaCy
try:
    nlp = spacy.load("en_core_web_sm", disable=["parser", "tagger", "lemmatizer", "ner"])
    if "senter" not in nlp.pipe_names:
        nlp.add_pipe("sentencizer")
    print(" SpaCy loaded.")
except OSError:
    raise RuntimeError("SpaCy model not found. Run: python -m spacy download en_core_web_sm")

from spacy.lang.en.stop_words import STOP_WORDS as SPACY_STOPWORDS
SPACY_STOPWORDS = {word.lower() for word in SPACY_STOPWORDS}
print(f" {len(SPACY_STOPWORDS)} stopwords loaded.")

 SpaCy loaded.
 326 stopwords loaded.


In [3]:
_whitespace_re = re.compile(r"[ \t\v\f]+")
_newlines_re   = re.compile(r"\s*\n\s*")

def clean_text_basic(txt: str) -> str:
    if not isinstance(txt, str) or pd.isna(txt):
        return ""
    txt = txt.lower()
    txt = txt.replace("\r\n", "\n").replace("\r", "\n")
    txt = _newlines_re.sub("\n", txt)
    txt = _whitespace_re.sub(" ", txt).strip()
    return txt

def sentencize_and_clean(raw_text: str, nlp):
    raw_text_clean = clean_text_basic(raw_text)
    doc = nlp.make_doc(raw_text_clean)
    if "senter" in nlp.pipe_names:
        nlp.get_pipe("senter")(doc)
    else:
        doc = nlp(raw_text_clean)
    sents = [s.text.strip() for s in doc.sents if s.text.strip()]
    return sents, raw_text_clean


_word_re_cache = {}
def _has_term(sentence: str, term: str) -> bool:
    rx = _word_re_cache.get(term)
    if rx is None:
        term_rx = r"\b" + r"\s+".join(map(re.escape, term.split())) + r"\b"
        rx = re.compile(term_rx)
        _word_re_cache[term] = rx
    return rx.search(sentence) is not None

def _exp_terms_in(sentences):
    present = set()
    for t in EXP_TERMS_FLAT:
        if any(_has_term(s, t) for s in sentences):
            present.add(t)
    return present

In [4]:
def lsa_hdbscan_summary(
    sentences,
    n_components: int = 24,
    min_cluster_size: int = 7,
    min_samples: int = 1,
    top_k_ratio: float = 0.25,
    max_clusters: int = 2,
    pos_bias: float = 0.0
) -> str:
    if not sentences:
        return ""
    sents = [s for s in sentences if s.strip()]
    if len(sents) < 2:
        return " ".join(sents)

    # TF-IDF + LSA
    tfidf = TfidfVectorizer(lowercase=False, stop_words="english", max_features=2000, sublinear_tf=True, norm="l2")
    X_tfidf = tfidf.fit_transform(sents)
    n_features = X_tfidf.shape[1]

    max_rank = max(2, min(n_components, n_features - 1, len(sents) - 1, 64))
    if max_rank < 2:
        k = max(1, int(round(len(sents) * top_k_ratio)))
        return " ".join(sents[:k])

    svd = TruncatedSVD(n_components=max_rank, random_state=42)
    X_lsa = svd.fit_transform(X_tfidf)
    X_lsa = X_lsa / (np.linalg.norm(X_lsa, axis=1, keepdims=True) + 1e-10)

    # HDBSCAN Clustering
    clusterer = hdbscan.HDBSCAN(
        min_cluster_size=min_cluster_size,
        min_samples=min_samples,
        metric='euclidean',
        cluster_selection_method='eom',
        prediction_data=True
    )
    labels = clusterer.fit_predict(X_lsa)

    unique_labels = np.setdiff1d(np.unique(labels), [-1])
    if len(unique_labels) == 0:
        k = max(1, int(round(len(sents) * top_k_ratio)))
        return " ".join(sents[:k])

    # Cluster scoring
    try:
        probs = clusterer.probabilities_
    except Exception:
        probs = np.ones(len(sents))

    doc_exp_terms = _exp_terms_in(sents)
    cluster_scores = []

    for lbl in unique_labels:
        idxs = np.where(labels == lbl)[0]
        size = len(idxs)
        if size == 0:
            continue
        prob_mean = float(np.mean(probs[idxs]))
        sents_in_cluster = [sents[i] for i in idxs]
        exp_in_cluster = _exp_terms_in(sents_in_cluster)
        exp_coverage = (len(exp_in_cluster & doc_exp_terms) / max(1, len(doc_exp_terms))) if doc_exp_terms else 1.0
        score = (size**0.7) * (prob_mean**1.2) * (0.6 + 0.4*exp_coverage)
        cluster_scores.append((score, idxs))

    # Select top clusters
    cluster_scores.sort(key=lambda z: z[0], reverse=True)
    cluster_scores = cluster_scores[:max_clusters]

    # Allocate sentences proportionally
    k_final = max(1, int(round(len(sents) * top_k_ratio)))
    if not cluster_scores:
        return " ".join(sents[:k_final])

    weights = np.array([s for s, _ in cluster_scores], dtype=float)
    weights = weights / (weights.sum() + 1e-9)
    quotas = np.maximum(1, np.round(weights * k_final)).astype(int)
    while quotas.sum() > k_final:
        quotas[np.argmax(quotas)] -= 1
    while quotas.sum() < k_final:
        quotas[np.argmin(quotas)] += 1

    selected = []
    for (score, idxs), q in zip(cluster_scores, quotas):
        centroid = np.mean(X_lsa[idxs], axis=0)
        centroid /= (np.linalg.norm(centroid) + 1e-10)
        sims = cosine_similarity(X_lsa[idxs], centroid.reshape(1, -1)).ravel()
        sims = sims + pos_bias * (1.0 - (idxs / (len(sents) + 1e-9)))
        order = idxs[np.argsort(-sims)]
        selected.extend(order[:q])

    # Deduplicate & sort by original position
    selected = sorted(dict.fromkeys(selected))
    return " ".join(sents[i] for i in selected[:k_final])

In [5]:
def _tfidf_cosine(a: str, b: str) -> float:
    if not a.strip() or not b.strip():
        return 0.0
    vec = TfidfVectorizer(lowercase=False, stop_words="english", max_features=4000)
    try:
        X = vec.fit_transform([a, b])
        return float(cosine_similarity(X[0], X[1])[0,0])
    except Exception:
        return 0.0

def score_semantic(summary: str, document: str) -> float:
    return _tfidf_cosine(summary, document)

def score_experiential(summary: str, document: str) -> float:
    if not summary or not document:
        return 0.0
    doc_sents = [s.strip() for s in re.split(r'(?<=[.!?])\s+', document) if s.strip()]
    sum_sents = [s.strip() for s in re.split(r'(?<=[.!?])\s+', summary) if s.strip()]
    doc_terms = _exp_terms_in(doc_sents)
    if not doc_terms:
        return 1.0
    sum_terms = _exp_terms_in(sum_sents)
    return len(doc_terms & sum_terms) / len(doc_terms)

def score_coherence(summary: str) -> float:
    if not summary.strip():
        return 0.0
    sents = [s.strip() for s in re.split(r'(?<=[.!?])\s+', summary) if s.strip()]
    if len(sents) < 2:
        return 1.0
    vec = TfidfVectorizer(lowercase=False, stop_words="english", max_features=4000)
    try:
        X = vec.fit_transform(sents)
        sims = [float(cosine_similarity(X[i], X[i+1])[0,0]) for i in range(len(sents)-1)]
        return float(np.mean(sims)) if sims else 0.0
    except Exception:
        return 0.0

def custom_score(summary: str, document: str) -> float:
    if not summary or not document:
        return 0.0
    sem = score_semantic(summary, document)
    exp = score_experiential(summary, document)
    coh = score_coherence(summary)
    return 0.5*sem + 0.3*exp + 0.2*coh

In [6]:
best_params = {
    "n_components": 24,
    "min_cluster_size": 7,
    "min_samples": 1,
    "top_k_ratio": 0.25,
    "pos_bias": 0.0
}

In [None]:
DATA_DIR = Path(r"")
test_files = {
    "DMT": DATA_DIR / "dmt_test_100.csv",
    "LSD": DATA_DIR / "lsd_test_100.csv",
    "Psilocybin": DATA_DIR / "mushroom_test_100.csv"
}

def _norm_sub(x):
    if not isinstance(x, str): return "OTHER"
    y = x.strip().upper()
    if y in {"DMT"}: return "DMT"
    if y in {"LSD", "ACID"}: return "LSD"
    if y in {"PSILOCYBIN", "PSILOCYBIN MUSHROOM", "MUSHROOM", "MUSHROOMS", "PSILOCYBE"}:
        return "Psilocybin"
    return y

results = []

for substance, file_path in test_files.items():
    print(f"\n Processing {substance}...")
    if not file_path.exists():
        print(f" File not found: {file_path}")
        continue

    df = pd.read_csv(file_path)
    if "report_text" not in df.columns:
        raise KeyError(f"Missing 'report_text' in {file_path}")

    df["_subst_norm"] = df["substance"].map(_norm_sub)
    df = df[df["_subst_norm"] == substance].copy()
    print(f"Loaded {len(df)} reports for {substance}")

    # Precompute sentences
    cache = []
    for text in df["report_text"].astype(str).fillna("").tolist():
        sents, cleaned = sentencize_and_clean(text, nlp)
        cache.append({"sentences": sents, "doc_clean": cleaned})

    # Generate summaries
    summaries = []
    semantic_scores = []
    experiential_scores = []
    coherence_scores = []
    final_scores = []

    for entry in cache:
        summary = lsa_hdbscan_summary(entry["sentences"], **best_params)
        summaries.append(summary)

        sem = score_semantic(summary, entry["doc_clean"])
        exp = score_experiential(summary, entry["doc_clean"])
        coh = score_coherence(summary)
        final = custom_score(summary, entry["doc_clean"])

        semantic_scores.append(sem)
        experiential_scores.append(exp)
        coherence_scores.append(coh)
        final_scores.append(final)

    # Add summary column
    df["summary"] = summaries
    output_path = file_path.with_name(file_path.stem + "_with_summary.csv")
    df.to_csv(output_path, index=False)
    print(f"Saved summaries to {output_path}")

    # Aggregate scores
    avg_sem = np.mean(semantic_scores)
    avg_exp = np.mean(experiential_scores)
    avg_coh = np.mean(coherence_scores)
    avg_final = np.mean(final_scores)

    results.append({
        "Model": "M2",
        "Substance": substance,
        "Semantic (TF-IDF/SBERT)": f"{avg_sem:.2f} (TF-IDF)",
        "Experiential": f"{avg_exp:.2f}",
        "Coherence (TF-IDF/SBERT)": f"{avg_coh:.2f} (TF-IDF)",
        "Final Score": f"{avg_final:.2f}"
    })

    print(f" {substance} - Avg Final Score: {avg_final:.3f}")


 Processing DMT...
Loaded 100 reports for DMT
Saved summaries to D:\GitHub\Psychedelics Summary\Dataset\Train Test\dmt_test_100_with_summary.csv
 DMT - Avg Final Score: 0.375

 Processing LSD...
Loaded 100 reports for LSD
Saved summaries to D:\GitHub\Psychedelics Summary\Dataset\Train Test\lsd_test_100_with_summary.csv
 LSD - Avg Final Score: 0.402

 Processing Psilocybin...
Loaded 100 reports for Psilocybin


In [None]:
results_df = pd.DataFrame(results)
print("\n" + "="*70)
print(" FINAL RESULTS: LSA+HDBSCAN (M2) ON TEST SET")
print("="*70)
print(results_df.to_string(index=False))

In [None]:
# Save to CSV
results_df.to_csv(DATA_DIR / "lsa_hdbscan_test_scores.csv", index=False)
print(f"\n Results saved to {DATA_DIR / 'lsa_hdbscan_test_scores.csv'}")