In [None]:
import numpy as np
import networkx as nx
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer

def embed_texts(texts, max_features=512, normalize=True):
    """
    Simple text embedding using TF-IDF vectors.
    Returns a list of numpy arrays (one per text). Vectors are optionally L2-normalized.
    """

# Create or update a joblib pre-trained embeddings file from DB (supabase)
import os
import joblib
import numpy as np
from dotenv import load_dotenv
load_dotenv()
# attempt SBERT first, fallback to TF-IDF
try:
    from sentence_transformers import SentenceTransformer
    sbert = SentenceTransformer('all-MiniLM-L6-v2')
except Exception:
    sbert = None

# supabase config from env (ensure these are set in your notebook env)
SUPABASE_URL = os.getenv('SUPABASE_URL')
SUPABASE_KEY = os.getenv('SUPABASE_KEY')
try:
    from supabase import create_client
    sb = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL and SUPABASE_KEY else None
except Exception:
    sb = None

# fetch past novelty entries from DB table 'novelty_reports' (fallback to storage not implemented here)
past_texts = []
if sb is not None:
    try:
        res = sb.table('novelty_reports').select('filename,result').limit(2000).execute()
        rows = getattr(res, 'data', []) or []
        for r in rows:
            content = r.get('result') or {}
            # pick unique_sections if present, else raw snippet
            for s in (content.get('unique_sections') or [])[:5]:
                if s and s not in past_texts:
                    past_texts.append(s)
            raw = (content.get('raw_text') or content.get('raw') or '')
            if raw and raw not in past_texts:
                past_texts.append(raw[:1000])
    except Exception as e:
        print('Supabase read failed in notebook:', e)

# prepare pretrain directory relative to this notebook path
PRETRAIN_DIR = os.path.join(os.getcwd(), 'pre-trained')
os.makedirs(PRETRAIN_DIR, exist_ok=True)
JOBLIB_PATH = os.path.join(PRETRAIN_DIR, 'novelty_embeddings.joblib')

def compute_and_save_embeddings(texts):
    if not texts:
        joblib.dump({'texts': [], 'embs': None}, JOBLIB_PATH)
        return
    # Prefer SBERT embeddings when available
    if sbert is not None:
        try:
            embs = sbert.encode(texts, convert_to_tensor=False, show_progress_bar=True)
            embs = np.vstack([np.array(e).astype(np.float32) for e in embs])
            # try to fit a nearest-neighbors index for faster lookup
            try:
                from sklearn.neighbors import NearestNeighbors
                if embs.shape[0] > 1:
                    nn = NearestNeighbors(n_neighbors=min(10, embs.shape[0]-1), metric='cosine', algorithm='auto')
                    nn.fit(embs)
                else:
                    nn = None
            except Exception as _e:
                print('Failed to build NN index (SBERT):', _e)
                nn = None
            joblib.dump({'texts': texts, 'embs': embs.tolist(), 'nn': nn, 'backend': 'sbert'}, JOBLIB_PATH)
            print('Saved SBERT embeddings to', JOBLIB_PATH)
            return
        except Exception as e:
            print('SBERT embedding failed:', e)
    # fallback to TF-IDF if SBERT not available
    try:
        from sklearn.feature_extraction.text import TfidfVectorizer
        vec = TfidfVectorizer(max_features=1024, stop_words='english')
        X = vec.fit_transform(texts).toarray()
        # normalize rows
        norms = np.linalg.norm(X, axis=1, keepdims=True)
        norms[norms==0] = 1
        X = (X / norms).astype(np.float32)
        try:
            from sklearn.neighbors import NearestNeighbors
            if X.shape[0] > 1:
                nn = NearestNeighbors(n_neighbors=min(10, X.shape[0]-1), metric='cosine', algorithm='auto')
                nn.fit(X)
            else:
                nn = None
        except Exception as _e:
            print('Failed to build NN index (TF-IDF):', _e)
            nn = None
        joblib.dump({'texts': texts, 'embs': X.tolist(), 'nn': nn, 'vec': vec, 'backend': 'tfidf'}, JOBLIB_PATH)
        print('Saved TF-IDF embeddings to', JOBLIB_PATH)
        return
    except Exception as e:
        print('TF-IDF fallback failed:', e)

# deduplicate and limit size to keep file manageable
unique_texts = []
seen = set()
for t in past_texts:
    k = (t or '').strip()
    if not k:
        continue
    if k in seen:
        continue
    seen.add(k)
    unique_texts.append(k)
    if len(unique_texts) >= 2000:
        break

compute_and_save_embeddings(unique_texts)

# load back for verification
try:
    data = joblib.load(JOBLIB_PATH)
    print('joblib contains texts:', len(data.get('texts', [])))
except Exception as e:
    print('Failed to load saved joblib:', e)

In [4]:
# Load libraries
import os
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

sns.set(style="whitegrid")

# Supabase client (reuse env vars)
SUPABASE_URL = os.getenv('SUPABASE_URL')
SUPABASE_KEY = os.getenv('SUPABASE_KEY')
try:
    from supabase import create_client
    sb = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL and SUPABASE_KEY else None
except Exception:
    sb = None

# Fetch labeled novelty rows
rows = []
if sb is not None:
    try:
        res = sb.table('novelty_reports').select('filename,novelty_percentage,result').limit(2000).execute()
        rows = getattr(res, 'data', []) or []
    except Exception as e:
        print('Failed to read novelty_reports from Supabase:', e)

# Build dataframe of text (representative) and label
texts = []
labels = []
for r in rows:
    res_json = (r.get('result') or {})
    label = res_json.get('novelty_percentage') if res_json else r.get('novelty_percentage')
    try:
        label = int(label) if label is not None else None
    except Exception:
        label = None
    # pick representative text: first unique_section or raw snippet
    text = ''
    if res_json:
        us = res_json.get('unique_sections') or []
        if us:
            text = us[0]
        else:
            text = (res_json.get('raw_text') or res_json.get('raw') or '')[:1000]
    if not text or label is None:
        continue
    texts.append(text)
    labels.append(label)

n = min(len(texts), 1000)
if n == 0:
    print('No labeled rows found to evaluate.')
else:
    texts = texts[:n]
    labels = labels[:n]

    # Try loading precomputed joblib embeddings (if available)
    JOBLIB_PATH = os.path.join(os.getcwd(), 'pre-trained', 'novelty_embeddings.joblib')
    emb_store = None
    try:
        data = joblib.load(JOBLIB_PATH)
        emb_store = data.get('embs', None)
        if emb_store is not None:
            emb_store = np.asarray(emb_store)
    except Exception as e:
        emb_store = None

    # Create embeddings for our evaluation texts (prefer SBERT, fallback TF-IDF)
    try:
        from sentence_transformers import SentenceTransformer
        embedder = SentenceTransformer('all-MiniLM-L6-v2')
        eval_embs = embedder.encode(texts, convert_to_tensor=False, show_progress_bar=False)
        eval_embs = np.vstack([np.array(e).astype(np.float32) for e in eval_embs])
    except Exception as e:
        print('SBERT not available; falling back to TF-IDF for evaluation:', e)
        vec = TfidfVectorizer(max_features=1024, stop_words='english')
        X = vec.fit_transform(texts).toarray()
        norms = np.linalg.norm(X, axis=1, keepdims=True)
        norms[norms==0] = 1
        eval_embs = (X / norms).astype(np.float32)

    # Heuristic prediction: novelty = (1 - mean(top-k similarity to other docs)) * 100
    preds = []
    for i in range(len(eval_embs)):
        past = np.delete(eval_embs, i, axis=0)
        if past.shape[0] == 0:
            preds.append(100.0)
            continue
        sims = cosine_similarity(eval_embs[i:i+1], past)[0]
        k = min(5, len(sims))
        topk = -np.sort(-sims)[:k]
        mean_sim = float(np.mean(topk)) if len(topk) > 0 else 0.0
        preds.append((1.0 - mean_sim) * 100.0)

    # Metrics
    mae = mean_absolute_error(labels, preds)
    mse = mean_squared_error(labels, preds)
    rmse = mse ** 0.5
    r2 = r2_score(labels, preds)
    print(f'Count: {len(labels)}; MAE: {mae:.2f}; RMSE: {rmse:.2f}; R2: {r2:.3f}')

    df = pd.DataFrame({'actual': labels, 'pred': preds})

    # Scatter: actual vs predicted
    plt.figure(figsize=(7,6))
    sns.scatterplot(data=df, x='actual', y='pred', alpha=0.6)
    plt.plot([0,100],[0,100], 'r--')
    plt.xlabel('Actual Novelty (%)')
    plt.ylabel('Predicted Novelty (%)')
    plt.title('Actual vs Predicted Novelty')
    plt.tight_layout()
    plt.show()

    # Residuals histogram
    df['error'] = df['pred'] - df['actual']
    plt.figure(figsize=(7,4))
    sns.histplot(df['error'], bins=40, kde=True)
    plt.title('Prediction Error Distribution (pred - actual)')
    plt.xlabel('Error')
    plt.tight_layout()
    plt.show()

    # Calibration: bin by actual novelty and show mean predicted vs actual per-bin
    df['bin'] = pd.cut(df['actual'], bins=10)
    calib = df.groupby('bin').agg(mean_actual=('actual','mean'), mean_pred=('pred','mean'), count=('actual','count')).reset_index()
    plt.figure(figsize=(8,4))
    sns.lineplot(data=calib, x='mean_actual', y='mean_pred', marker='o')
    plt.plot([0,100],[0,100],'r--')
    plt.xlabel('Mean Actual Novelty (bin)')
    plt.ylabel('Mean Predicted Novelty')
    plt.title('Calibration by Actual Novelty Bins')
    plt.tight_layout()
    plt.show()

No labeled rows found to evaluate.


In [None]:
# --- Ensure joblib contains a fitted NearestNeighbors index (if missing) ---
import os
import joblib
import numpy as np
JOBLIB_PATH = os.path.join(os.getcwd(), 'pre-trained', 'novelty_embeddings.joblib')
try:
    data = joblib.load(JOBLIB_PATH)
except Exception as e:
    data = {}
if data:
    texts = data.get('texts', [])
    embs = data.get('embs', None)
    try:
        if embs is not None:
            embs_arr = np.asarray(embs).astype(np.float32)
            if data.get('nn') is None and embs_arr.shape[0] > 1:
                try:
                    from sklearn.neighbors import NearestNeighbors
                    nn = NearestNeighbors(n_neighbors=min(10, embs_arr.shape[0]-1), metric='cosine', algorithm='auto')
                    nn.fit(embs_arr)
                    data['nn'] = nn
                    joblib.dump(data, JOBLIB_PATH)
                    print('Updated joblib with NearestNeighbors index')
                except Exception as e:
                    print('Failed to build or save NN index:', e)
    except Exception as e:
        print('Error while ensuring NN index:', e)
else:
    print('No pre-trained joblib found to update.')

In [None]:
# --- Demo: load the pre-trained joblib and score a sample text, then produce the formatted comment ---
import joblib
import os
import numpy as np
JOBLIB_PATH = os.path.join(os.getcwd(), 'pre-trained', 'novelty_embeddings.joblib')
try:
    store = joblib.load(JOBLIB_PATH)
except Exception as e:
    print('Failed to load joblib:', e)
    store = {}
texts = store.get('texts', [])
embs = store.get('embs', None)
vec = store.get('vec', None)
nn = store.get('nn', None)
backend = store.get('backend', 'sbert' if embs is not None else 'tfidf')
# load SBERT if available
try:
    from sentence_transformers import SentenceTransformer
    sbert_local = SentenceTransformer('all-MiniLM-L6-v2')
except Exception:
    sbert_local = None

def embed_one(text):
    if backend == 'sbert' and sbert_local is not None:
        v = sbert_local.encode([text], convert_to_tensor=False, show_progress_bar=False)
        return np.array(v[0]).astype(np.float32)
    if backend == 'tfidf' and vec is not None:
        X = vec.transform([text]).toarray().astype(np.float32)
        norms = np.linalg.norm(X, axis=1, keepdims=True)
        norms[norms==0] = 1
        return (X / norms)[0]
    # fallback: simple TF-IDF on the fly
    try:
        from sklearn.feature_extraction.text import TfidfVectorizer
        v = TfidfVectorizer(max_features=1024, stop_words='english')
        v.fit([text] + texts)
        X = v.transform([text]).toarray().astype(np.float32)
        norms = np.linalg.norm(X, axis=1, keepdims=True)
        norms[norms==0] = 1
        return (X / norms)[0]
    except Exception as e:
        print('Fallback embed failed:', e)
        return np.zeros((1,), dtype=np.float32)

def score_text(text, top_k=5):
    emb = embed_one(text)
    if embs is None:
        return 100.0
    emb_arr = np.asarray(embs).astype(np.float32)
    if nn is not None:
        try:
            dists, idxs = nn.kneighbors([emb], n_neighbors=min(top_k, emb_arr.shape[0]))
            sims = 1.0 - dists[0]
            mean_sim = float(np.mean(sims)) if len(sims) > 0 else 0.0
            novelty = (1.0 - mean_sim) * 100.0
            return novelty
        except Exception as e:
            print('NN kneighbors failed:', e)
    # fallback cosine similarity
    try:
        from sklearn.metrics.pairwise import cosine_similarity
        sims = cosine_similarity([emb], emb_arr)[0]
        k = min(top_k, len(sims))
        topk = -np.sort(-sims)[:k]
        mean_sim = float(np.mean(topk)) if len(topk) > 0 else 0.0
        novelty = (1.0 - mean_sim) * 100.0
        return novelty
    except Exception as e:
        print('Cosine similarity fallback failed:', e)
        return 100.0

def format_comment(score):
    score_int = int(round(score))
    changeable = max(0, 100 - score_int)
    # Short generic summary -- replace with Gemini-generated text in production if available
    summary = "The proposal introduces an integrated coal waste-to-energy process combining gasification and carbon-capture at pilot scale. The approach appears genuinely novel and should be prioritised for further validation."
    recs = [
        "Document prior art and clearly highlight novel integration steps versus published work.",
        "Provide pilot test plans and small-scale validation data to substantiate claimed innovations.",
        "Include IP or patent landscape notes where applicable to strengthen novelty claims."
    ]
    # Build the formatted block exactly as requested
    actions = ' '.join(recs)
    comment = f"Score: {score_int}/100 Changeable: {changeable}% {summary} Recommended actions: {actions}"
    return comment

# Demo run: score the first stored text (if present) or a sample 500-word placeholder
sample = texts[0] if texts else (
    'This is a sample novelty text describing a pilot-scale integration of gasification with carbon capture aimed at coal waste. ' * 10
)
score = score_text(sample)
print('Novelty score:', round(score, 2))
print('

print(format_comment(score))Formatted comment:
')