# <center>ChatBot "Python"</center>

## Libraries

In [1]:
import requests, time, re, random, hashlib, json, codecs, os, io, textwrap, copy, zipfile
import spacy
import pandas as pd
import numpy as np
import joblib

from bs4 import BeautifulSoup
from gensim.models import Word2Vec
from annoy import AnnoyIndex
from tqdm import tqdm

from rank_bm25 import BM25Okapi
from sklearn.model_selection import GroupShuffleSplit
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, classification_report

## Build Product PyPi database

In [2]:
def get_package_names(url, max_n = None):
    """Extract package names from stats page."""

    html = requests.get(url, timeout = 20).text
    soup = BeautifulSoup(html, 'lxml')
    names = []
    for a in soup.select("a[href*='pypi.org/project/']"):
        name = (a.get_text() or "").strip()
        if name and re.fullmatch(r"[A-Za-z0-9][A-Za-z0-9._\-]*", name):
            names.append(name)

    # Duplicate preserving order
    seen = set(); uniq = []
    for n in names:
        if n not in seen:
            uniq.append(n); seen.add(n)
    if max_n is not None:
        uniq = uniq[:int(max_n)]
    return uniq

def strip_html(s: str) -> str:
    if not isinstance(s, str):
        return ""
    try:
        return BeautifulSoup(s, 'lxml').get_text(" ", strip = True)
    except Exception:
        # Fallback: crude tag strip
        return re.sub(r"<[^>]+>", " ", s)

def fetch_pypi_pkg(name: str):
    """Fetch title / description from PyPI JSON API."""

    url = f"https://pypi.org/pypi/{name}/json"
    r = requests.get(url, timeout = 20)
    r.raise_for_status()
    info = r.json().get('info', {})
    title = info.get("name", name) or name
    desc = info.get('summary') or info.get('description') or ""
    return {
        'product_id' : name,
        'title' : str(title),
        'description' : strip_html(desc)
    }

In [3]:
SEED_URL_internet = 'https://n0x5.github.io/PyPI_Stats/internet.html'

# 1) get the names from stats page

names_internet = get_package_names(SEED_URL_internet, max_n = 2000)
print(f"Found {len(names_internet)} package names_internet on the page.")

# 2) enrich via PyPI JSON API

rows = []
for i, pkg in enumerate(names_internet, 1):
    try:
        rows.append(fetch_pypi_pkg(pkg))
    except Exception as e:
        print('skip', pkg, "->", e)
    time.sleep(0.2)


product_internet = pd.DataFrame(rows)
product_internet.dropna(subset = ['title'], inplace = True)
product_internet['product_id'] = product_internet['product_id'].astype(str)
product_internet['title'] = product_internet['title'].astype(str)
product_internet['description']= product_internet['description'].fillna("").astype(str)
product_internet.head()

Found 1062 package names_internet on the page.
skip office365-rest-client -> 404 Client Error: Not Found for url: https://pypi.org/pypi/office365-rest-client/json
skip turtle -> 404 Client Error: Not Found for url: https://pypi.org/pypi/turtle/json
skip flask-bootstrapforms -> 404 Client Error: Not Found for url: https://pypi.org/pypi/flask-bootstrapforms/json
skip django-toolbelt -> 404 Client Error: Not Found for url: https://pypi.org/pypi/django-toolbelt/json


Unnamed: 0,product_id,title,description
0,urllib3,urllib3,HTTP library with thread-safe connection pooli...
1,requests,requests,Python HTTP for Humans.
2,idna,idna,Internationalized Domain Names in Applications...
3,google-api-core,google-api-core,Google API client core library
4,google-auth,google-auth,Google Authentication Library


In [4]:
SEED_URL_multimedia = 'https://n0x5.github.io/PyPI_Stats/multimedia.html'

# 1) get the names from stats page

names_multimedia = get_package_names(SEED_URL_multimedia, max_n = 500)
print(f"Found {len(names_multimedia)} package names_multimedia on the page.")

# 2) enrich via PyPI JSON API

rows = []
for i, pkg in enumerate(names_multimedia, 1):
    try:
        rows.append(fetch_pypi_pkg(pkg))
    except Exception as e:
        print('skip', pkg, "->", e)
    time.sleep(0.2)


product_multimedia = pd.DataFrame(rows)
product_multimedia.dropna(subset = ['title'], inplace = True)
product_multimedia['product_id'] = product_multimedia['product_id'].astype(str)
product_multimedia['title'] = product_multimedia['title'].astype(str)
product_multimedia['description']= product_multimedia['description'].fillna("").astype(str)
product_multimedia.head()

Found 192 package names_multimedia on the page.


Unnamed: 0,product_id,title,description
0,pillow,pillow,Python Imaging Library (Fork)
1,emoji,emoji,Emoji for Python
2,seaborn,seaborn,Statistical data visualization
3,imagesize,imagesize,Getting image size from png/jpeg/jpeg2000/gif ...
4,resampy,resampy,Efficient signal resampling


In [5]:
assert set(product_internet.columns) == set(product_multimedia.columns), "Mismatched coolumns!"

In [6]:
product = pd.concat([product_internet, product_multimedia], ignore_index = True).drop_duplicates()
product.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1221 entries, 0 to 1249
Data columns (total 3 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   product_id   1221 non-null   object
 1   title        1221 non-null   object
 2   description  1221 non-null   object
dtypes: object(3)
memory usage: 38.2+ KB


In [7]:
# Ensure some must-have packages are present, then save

MUST_HAVE = [
    "numpy","pandas","matplotlib","scipy","scikit-learn","seaborn","pytest",
    "flask","django","fastapi","pydantic","sqlalchemy","requests","uvicorn"
]

def _canon_pkg(s: str) -> str:
    s = str(s or "").strip().lower().replace("_","-")
    return re.sub(r"[^a-z0-9\-.]+","", s)

def has_pkg(df, pkg):
    canon = _canon_pkg(pkg)
    by_id = df["product_id"].astype(str).str.lower().str.replace("_","-", regex=False).eq(canon).any()
    by_title = df["title"].astype(str).str.lower().str.replace("_","-", regex=False).str.contains(rf"\b{re.escape(canon)}\b").any()
    return bool(by_id or by_title)

missing = [p for p in MUST_HAVE if not has_pkg(product, p)]
rows = []
for pkg in missing:
    try:
        rows.append(fetch_pypi_pkg(pkg)); time.sleep(0.2)
    except Exception as e:
        print("skip", pkg, "->", e)

if rows:
    product = pd.concat([product, pd.DataFrame(rows)], ignore_index=True)
    product.drop_duplicates("product_id", keep="first", inplace=True)
    print("Added:", [r["product_id"] for r in rows])
else:
    print("All must-have packages already present.")

product.to_csv("products_pypi.csv", index = False)

Added: ['numpy', 'matplotlib', 'scipy', 'scikit-learn']


In [8]:
# Sanity check

for name in ["numpy","matplotlib","scipy","scikit-learn","requests"]:
    hits = product[product["product_id"].str.lower().eq(name)][["product_id","title"]].head(3)
    print(f"\n{name} present? {not hits.empty}")
    display(hits)


numpy present? True


Unnamed: 0,product_id,title
1221,numpy,numpy



matplotlib present? True


Unnamed: 0,product_id,title
1222,matplotlib,matplotlib



scipy present? True


Unnamed: 0,product_id,title
1223,scipy,scipy



scikit-learn present? True


Unnamed: 0,product_id,title
1224,scikit-learn,scikit-learn



requests present? True


Unnamed: 0,product_id,title
1,requests,requests


## Load Python FAQ Dataset & text preprocessing

#### Load FAQ dataset

In [9]:
# Robust FAQ csv / zip loader (handles, cp1252 / latin1, comma / tsv)

faq_path = "Python FAQ Dataset.zip"  

if faq_path.lower().endswith(".zip"):
    with zipfile.ZipFile(faq_path) as z:
        names = z.namelist()
        # pick the first csv/tsv-ish file
        inner = next((n for n in names if n.lower().endswith((".csv",".tsv",".txt"))), names[0])
        raw = z.read(inner)
        last_err = None
        for enc in ["utf-8","utf-8-sig","cp1252","latin1","iso-8859-1"]:
            try:
                data = io.BytesIO(raw.decode(enc).encode("utf-8"))
                faq_raw = pd.read_csv(data, sep=None, engine="python", on_bad_lines="skip")
                print(f"Loaded {inner} from ZIP with encoding {enc}; rows={len(faq_raw)}")
                break
            except Exception as e:
                last_err = e
        else:
            raise last_err
else:
    last_err = None
    for enc in ["utf-8","utf-8-sig","cp1252","latin1","iso-8859-1"]:
        try:
            faq_raw = pd.read_csv(faq_path, encoding=enc, sep=None, engine="python", on_bad_lines="skip")
            print(f"Loaded {faq_path} with encoding {enc}; rows={len(faq_raw)}")
            break
        except Exception as e:
            last_err = e
    else:
        raise last_err
faq_raw.head()
print("Columns:", list(faq_raw.columns))

Loaded Python FAQ Dataset.csv from ZIP with encoding cp1252; rows=138
Columns: ['Questions', 'Answers']


In [10]:
lower_map = {c.lower().strip(): c for c in faq_raw.columns}
CAND_Q = ["question","questions","q","prompt","ask"]
CAND_A = ["answer","answers","a","response","reply"]

def pick(colmap, cands):
    for c in cands:
        if c in colmap:
            return colmap[c]
    return None

cq = pick(lower_map, CAND_Q)
ca = pick(lower_map, CAND_A)

if cq and ca:
    Q = faq_raw[cq].astype(str)
    A = faq_raw[ca].astype(str)
else:
    # fallback: first two non-empty columns
    nonempty = [c for c in faq_raw.columns if faq_raw[c].notna().any()]
    if len(nonempty) < 2:
        raise ValueError("Could not find two non-empty columns for FAQ.")
    Q = faq_raw[nonempty[0]].astype(str)
    A = faq_raw[nonempty[1]].astype(str)
    print(f"WARNING: fell back to columns: {nonempty[:2]}")

# --- clean, drop empties/dupes ---
Q = Q.fillna("").str.strip()
A = A.fillna("").str.strip()
mask = (Q != "") & (A != "")
Q = Q[mask]; A = A[mask]
faq_df = pd.DataFrame({"Q": Q.values, "A": A.values}).drop_duplicates("Q")
faq_pairs = list(faq_df.itertuples(index=False, name=None))  # list[(Q,A)]
print(f"FAQ pairs: {len(faq_pairs)}")

FAQ pairs: 121


#### English preprocessing (spaCy lemmatization + stopwords)

In [11]:
# English preprocessing (spaCy lemmatization if available, otherwise simple)

WH_KEEP = {"why","what","how","where","when","which","who","whom","whose"}
WORD_RE = re.compile(r"[A-Za-z0-9][A-Za-z0-9._\-]*")  # keep digits/dot/underscore/hyphen

try:
    nlp = spacy.load("en_core_web_sm", exclude=["parser","ner","textcat","senter"])
    STOP = set(nlp.Defaults.stop_words) - WH_KEEP

    def preprocess_en(text: str):
        toks = WORD_RE.findall(str(text).lower())
        if not toks:
            return []
        doc = nlp(" ".join(toks))  # tagger is enabled → no W108
        out = []
        for t in doc:
            lem = t.lemma_.strip()
            if not lem or lem in STOP:
                continue
            out.append(lem)
        return out

    print("spaCy enabled preprocessing")

except Exception:
    # fallback tokenizer/stopwords
    from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS
    STOP = set(ENGLISH_STOP_WORDS) - WH_KEEP

    def preprocess_en(text: str):
        toks = WORD_RE.findall(str(text).lower())
        return [t for t in toks if t and t not in STOP]

    print("spaCy unavailable -> using simple tokenizer")

spaCy enabled preprocessing


## Train classifier (product vs chatter)

In [12]:
# Build labeled data

product = pd.read_csv('products_pypi.csv')
product['text_clean'] = (product.title.fillna("") + " " + product.description.fillna(""))

product.head()

Unnamed: 0,product_id,title,description,text_clean
0,urllib3,urllib3,HTTP library with thread-safe connection pooli...,urllib3 HTTP library with thread-safe connecti...
1,requests,requests,Python HTTP for Humans.,requests Python HTTP for Humans.
2,idna,idna,Internationalized Domain Names in Applications...,idna Internationalized Domain Names in Applica...
3,google-api-core,google-api-core,Google API client core library,google-api-core Google API client core library
4,google-auth,google-auth,Google Authentication Library,google-auth Google Authentication Library


In [13]:
random.seed(42)

def norm_text(s: str) -> str:
    return re.sub(r"\s+", " ", str(s).strip().lower())

# synthesize short product queries (text, product_id)

def synth_product_queries(df: pd.DataFrame, per_item = 6, seed = 42):
    random.seed(seed)
    T = [
        "what is {name}", "how to use {name}", "pip install {name}",
        "{name} tutorial", "{name} examples", "{name} docs", "{name} vs {alt}"
    ]
    names = df['product_id'].astype(str).tolist()
    rows = []
    for name in names:
        alts = [n for n in names if n != names] or [name]
        for _ in range(per_item):
            tpl = random.choice(T)
            alt = random.choice(alts)
            q = tpl.format(name = name, alt = alt)
            rows.append({'text' : q, 'y' : 1, 'group' : f'prod_{name}'})
    return pd.DataFrame(rows)

# Positive from PyPI products

pos_df = synth_product_queries(product, per_item = 8)

# Negative from FAQ only

neg_q = [q for q, _ in faq_pairs]
neg_df = pd.DataFrame({'text': neg_q, 'y' : 0})
neg_df['group'] = neg_df['text'].map(lambda s: 'neg_' + hashlib.md5(norm_text(s).encode()).hexdigest()) # to avoid duplicates

# Drop exact duplicates

pos_df = pos_df.drop_duplicates('text').reset_index(drop = True)
neg_df = neg_df.drop_duplicates('text').reset_index(drop = True)

# Balance sizes

m = min(len(pos_df), len(neg_df))
pos_s = pos_df.sample(n = m, random_state = 42)
neg_s = neg_df.sample(n = m, random_state = 42)

data = pd.concat([pos_s, neg_s], ignore_index = True)
X = data['text'].tolist()
y = data['y'].values
groups = data['group'].values

# Grouped split: no leakage of product queries across folds

gss = GroupShuffleSplit(n_splits = 1, test_size = 0.2, random_state = 42)
train_idx, val_idx = next(gss.split(X, y, groups))

X_train = [X[i] for i in train_idx]; y_train = y[train_idx]
X_val   = [X[i] for i in val_idx];   y_val   = y[val_idx]

print('Exact overlap after grouped split:', len(set(X_train) & set(X_val)))

Exact overlap after grouped split: 0


In [14]:
vec = TfidfVectorizer(
    ngram_range = (1, 2),
    min_df = 2,
    max_df = 0.95,
    stop_words = 'english',
    token_pattern = r"[A-Za-z]{2,}",
    lowercase = True,
    strip_accents = 'unicode'
)

pipe_svc = Pipeline([
    ('tfidf', vec),
    ('clf', LinearSVC(random_state = 42))
])
pipe_svc.fit(X_train, y_train)
pred_svc = pipe_svc.predict(X_val)

print('SVC accuracy:', round(accuracy_score(y_val, pred_svc), 4), 'F1:', round(f1_score(y_val, pred_svc), 4))
print('\n', classification_report(y_val, pred_svc))

pipe_lr = Pipeline([
    ('tfidf', vec),
    ('clf', LogisticRegression(max_iter = 1000, random_state = 42))
])
pipe_lr.fit(X_train, y_train)
pred_lr = pipe_lr.predict(X_val)

print('\nLR accuracy:', round(accuracy_score(y_val, pred_lr), 4), "F1:", round(f1_score(y_val, pred_lr), 4))
print("\n", classification_report(y_val, pred_lr))

SVC accuracy: 0.9388 F1: 0.9302

               precision    recall  f1-score   support

           0       0.90      1.00      0.95        26
           1       1.00      0.87      0.93        23

    accuracy                           0.94        49
   macro avg       0.95      0.93      0.94        49
weighted avg       0.95      0.94      0.94        49


LR accuracy: 0.9388 F1: 0.9302

               precision    recall  f1-score   support

           0       0.90      1.00      0.95        26
           1       1.00      0.87      0.93        23

    accuracy                           0.94        49
   macro avg       0.95      0.93      0.94        49
weighted avg       0.95      0.94      0.94        49



In [15]:
joblib.dump(pipe_svc, "models/product_query_svc.pkl")
joblib.dump(pipe_lr,  "models/product_query_lr.pkl")

['models/product_query_lr.pkl']

In [16]:
def clean_pair(q,a):
    q = re.sub(r"\s+"," ", str(q)).strip()
    a = re.sub(r"\s+"," ", str(a)).strip()
    return q, a

faq_pairs = [clean_pair(q,a) for q,a in faq_pairs]
faq_pairs = [(q,a) for q,a in faq_pairs
             if len(a) >= 40 and "python software foundation" not in a.lower()]

vec = TfidfVectorizer(min_df=2, max_df=0.9).fit([a for _,a in faq_pairs])
A = vec.transform([a for _,a in faq_pairs])
sim = cosine_similarity(A)

keep = []
seen = set()
for i in range(len(faq_pairs)):
    if i in seen: continue
    dup_idx = np.where(sim[i] > 0.85)[0]
    for j in dup_idx: seen.add(j)
    keep.append(i)

faq_pairs = [faq_pairs[i] for i in keep]
print("Kept:", len(faq_pairs))

Kept: 117


## Train Word2Vec & build Annoy indexes

In [17]:
# Add a lexical first stage (BM25) 

faq_q_tok_bm25 = [preprocess_en(q) for q,_ in faq_pairs]
bm25 = BM25Okapi(faq_q_tok_bm25)

def _faq_bm25_lookup(query, k=3, min_score=1.5):
    toks = preprocess_en(query)
    if not toks: return None
    scores = bm25.get_scores(toks)
    idx = int(np.argmax(scores))
    if scores[idx] >= min_score:
        return faq_pairs[idx][1]  # answer
    return None

In [18]:
# Tokenize products and FAQ

prod_title_tok = product['title'].fillna("").apply(preprocess_en).tolist()
prod_desc_tok = product['description'].fillna("").apply(preprocess_en).tolist()
faq_q_tok = [preprocess_en(q) for q,_ in faq_pairs]

# Train W2V on combined corpus for good coverage

w2v = Word2Vec(
    sentences = prod_title_tok + prod_desc_tok + faq_q_tok,
    vector_size = 100, window = 5, min_count = 1, sg = 1,
    negative = 10, workers = os.cpu_count(), epochs = 10, seed = 42
)

w2v.wv.fill_norms()
D = w2v.vector_size

def encode_tokens(tokens):
    vec = np.zeros(D, dtype = np.float32); n = 0
    for w in tokens:
        if w in w2v.wv:
            vec += w2v.wv.get_vector(w, norm = True); n += 1
    if n == 0: return None
    vec /= n; nrm = np.linalg.norm(vec)
    if nrm == 0: return None
    return (vec / nrm).astype(np.float32)

In [19]:
# Product Annoy

prod_index = AnnoyIndex(D, 'angular'); prod_map = {}; k = 0
for r in product.itertuples(index = False):
    t = preprocess_en(getattr(r, 'title', ""))
    d = preprocess_en(getattr(r, 'description', ""))
    v = encode_tokens(t + d)
    if v is None:
        continue
    prod_index.add_item(k, v)
    prod_map[k] = {'product_id' : str(r.product_id), 'title' : str(r.title)}
    k += 1

prod_index.build(30)
os.makedirs("models", exist_ok = True)
prod_index.save("models/product.ann")
with open('models/product_map.json', 'w', encoding = 'utf-8') as f:
    json.dump(prod_map, f, ensure_ascii = False)


# FAQ Annoy

faq_index = AnnoyIndex(D, 'angular'); faq_map = {}; j = 0
_kept_questions = []
for q, a in faq_pairs:
    v = encode_tokens(preprocess_en(q))
    if v is None:
        continue
    faq_index.add_item(j, v)
    faq_map[j] = a
    _kept_questions.append(q)   # save only those actually indexed
    j += 1

faq_index.build(10)
faq_index.save('models/faq.ann')
with open('models/faq_map.json', 'w', encoding = 'utf-8') as f:
    json.dump(faq_map, f, ensure_ascii = False)

# Save the FAQ questions for future lexical matching in fresh sessions

with open("models/faq_q.json","w",encoding="utf-8") as f:
    json.dump(_kept_questions, f, ensure_ascii=False)
    
# Save vectors

w2v.save("models/w2v_model.model")
w2v.wv.save("models/w2v_vectors.kv")

print(f"Indexed products: {k} | FAQ: {j}")

Indexed products: 1225 | FAQ: 117


## Final router: get_answer()

In [20]:
# Load artifacts (when starting a new session)

product = pd.read_csv("products_pypi.csv")
product["product_id"] = product["product_id"].astype(str)
product["title"] = product["title"].fillna("").astype(str)
product["description"] = product["description"].fillna("").astype(str)

# Load W2V to get dimension D

from gensim.models import Word2Vec
w2v = Word2Vec.load("models/w2v_model.model")
w2v.wv.fill_norms()
D = w2v.vector_size

def encode_tokens(tokens):
    vec = np.zeros(D, dtype=np.float32); n = 0
    for w in tokens:
        if w in w2v.wv:
            vec += w2v.wv.get_vector(w, norm=True); n += 1
    if n == 0: return None
    vec /= n; nrm = np.linalg.norm(vec)
    if nrm == 0: return None
    return (vec / nrm).astype(np.float32)

svc = joblib.load("models/product_query_svc.pkl")
try:
    lr  = joblib.load("models/product_query_lr.pkl")
except Exception:
    lr = None

prod_index = AnnoyIndex(D, "angular"); prod_index.load("models/product.ann")
faq_index  = AnnoyIndex(D, "angular"); faq_index.load("models/faq.ann")

with open("models/product_map.json","r",encoding="utf-8") as f:
    prod_map = {int(k): v for k, v in json.load(f).items()}
with open("models/faq_map.json","r",encoding="utf-8") as f:
    faq_map  = {int(k): v for k, v in json.load(f).items()}

In [21]:
# Build a lexical index for FAQ questions

def _encode_query(text): 
    return encode_tokens(preprocess_en(text))

def _norm(s: str) -> str:
    s = str(s or "").casefold()
    s = re.sub(r"[\"'’`´]", "", s)
    s = re.sub(r"\s+"," ", s).strip()
    return s

FAQ_STOP = {"package","library","module","install","pip","pip3","pipx",
            "pypi","on","for","with","the","a","an","in","to","of","and","is","are"}

WH = {"what","why","how","where","when","which","who","whom","whose",
      "can","are","is","does","do","did","was","were","will","shall",
      "should","could","would","may","might","must"}

GENERIC_Q = {"use","using","today","now","way","ways","make","get","work",
             "works","find","need","want","example","examples","show","tell","help"}

FAQ_OVERLAP_STOP = FAQ_STOP | WH | GENERIC_Q

# Try to get questions from memory (faq_pairs) or fallback to saved file

_faq_questions = []
try:
    _faq_questions = [q for q,_ in faq_pairs]
except Exception:
    try:
        with open("models/faq_q.json","r",encoding="utf-8") as f:
            _faq_questions = json.load(f)
    except Exception:
        _faq_questions = []

faq_lex = []
if _faq_questions:
    for i, q in enumerate(_faq_questions):
        a = faq_map.get(i, "")
        nq = _norm(q)
        # tokens for exact match (your previous logic)
        toks_exact = set(t for t in re.findall(r"[a-z0-9]+", nq) if t not in FAQ_STOP)
        # stricter tokens for overlap checks (drops how/use/etc.)
        toks_overlap = set(t for t in re.findall(r"[a-z0-9]+", nq) if t not in FAQ_OVERLAP_STOP)
        faq_lex.append({"nq": nq, "toks": toks_exact, "toks_overlap": toks_overlap, "ans": a})


def _faq_lexical_lookup(query: str, min_overlap: int = 2):
    if not faq_lex:
        return None
    qn = _norm(query)

    # exact match (uses broader tokens logic)
    for rec in faq_lex:
        if rec["nq"] == qn:
            return rec["ans"]

    # overlap on stricter tokens
    qtok = set(t for t in re.findall(r"[a-z0-9]+", qn) if t not in FAQ_OVERLAP_STOP)
    best, best_ov = None, 0
    for rec in faq_lex:
        ov = len(qtok & rec["toks_overlap"])
        if ov > best_ov:
            best, best_ov = rec, ov
    if best and best_ov >= min_overlap:
        return best["ans"]
    return None

In [22]:
# Quick sanity check

def _diag(q):
    print("\nQ:", q)
    print("FAQ lexical:", _faq_lexical_lookup(q, min_overlap=3))
    qv = _encode_query(q)
    if qv is None:
        print("qv: None")
        return
    # best FAQ/Product similarities (Annoy angular -> cosine-like)
    ids_f, dists_f = faq_index.get_nns_by_vector(qv, 3, include_distances=True)
    ids_p, dists_p = prod_index.get_nns_by_vector(qv, 3, include_distances=True)
    fs = [1 - (d*d)/2.0 for d in dists_f]
    ps = [1 - (d*d)/2.0 for d in dists_p]
    print("FAQ best sim:", round(fs[0],3) if fs else None)
    print("PROD best sim:", round(ps[0],3) if ps else None)

_diag("why was python created in the first place?")
_diag("are there any books on python?")


Q: why was python created in the first place?
FAQ lexical: Here’s a very brief summary of what started it all, written by Guido van Rossum: I had extensive experience with implementing an interpreted language in the ABC group at CWI, and from working with this group I had learned a lot about language design. This is the origin of many Python features, including the use of indentation for statement grouping and the inclusion of very-high-level data types (although the details are all different in Python). I had a number of gripes about the ABC language, but also liked many of its features. It was impossible to extend the ABC language (or its implementation) to remedy my complaints – in fact its lack of extensibility was one of its biggest problems. I had some experience with using Modula-2+ and talked with the designers of Modula-3 and read the Modula-3 report. Modula-3 is the origin of the syntax and semantics used for exceptions, and some other Python features. I was working in the A

In [23]:
# Name handling for products (kept here so it runs after product load)

def _canon_pkg(s: str) -> str:
    s = str(s or "").strip().lower().replace("_","-")
    return re.sub(r"[^a-z0-9\-.]+", "", s)

name_index = {}
for r in product.itertuples(index=False):
    pid   = _canon_pkg(getattr(r, "product_id", ""))
    title = str(getattr(r, "title", "")) or pid
    if not pid:
        continue
    rec = {"product_id": str(getattr(r, "product_id")), "title": title}
    name_index[pid] = rec
    alias = _canon_pkg(title)
    if alias and alias not in name_index:
        name_index[alias] = rec
    if alias.startswith("python-"):
        core = alias[len("python-"):]
        if core and core not in name_index:
            name_index[core] = rec
LEX_STOP = {
    "python","package","library","module","install","pip","pip3","pipx","pypi",
    "on","for","with","the","a","an","in","to","of","and","is","are"
}

def _name_hit(query: str):
    q = str(query or "").lower()
    m = re.search(r"\b(?:pip|pip3|pipx)\s+install\s+([a-z0-9._\-]+)\b", q)
    if not m:
        return None
    cand = _canon_pkg(m.group(1))
    if cand in name_index:
        rec = name_index[cand]
        return f"{rec['product_id']} {rec['title']}".strip()
    return None

# Build product lexical index with token sets

PRODUCT_TOKEN_RE = re.compile(r"[a-z0-9]+")

def _tokset(text, stop = LEX_STOP):
    return set(t for t in PRODUCT_TOKEN_RE.findall(_norm(text)) if t not in stop)
    
product_lex = []
for r in product.itertuples(index=False):
    title = str(getattr(r, "title", ""))
    desc  = str(getattr(r, "description", ""))
    product_lex.append({
        "product_id": str(r.product_id),
        "title": title,
        "norm_title": _norm(title),
        "title_toks": _tokset(title, stop = LEX_STOP),    # title tokens
        "desc_toks":  _tokset(desc, stop = LEX_STOP),     # description tokens
    })

def _lexical_product_lookup(query: str):
    qn = _norm(query)
    if not qn:
        return None

    # exact title fast path
    for rec in product_lex:
        if rec["norm_title"] == qn:
            return f"{rec['product_id']} {rec['title']}".strip()

    qtok = _tokset(qn, stop = LEX_STOP)
    
    best, best_title_overlap, best_total = None, 0, 0
    for rec in product_lex:
        ot = len(qtok & rec["title_toks"])
        od = len(qtok & rec["desc_toks"])
        total = ot + od
        
        # prefer more title overlap, then total overlap
        if (ot, total) > (best_title_overlap, best_total):
            best, best_title_overlap, best_total = rec, ot, total

    # REQUIRE >= 2 title tokens overlapped (prevents single "requests" matches)
    if best and best_title_overlap >= 2:
        return f"{best['product_id']} {best['title']}".strip()
    return None

In [24]:
def _best_sim_product(qv, k=3):
    ids, dists = prod_index.get_nns_by_vector(qv, k, include_distances=True)
    if not ids: 
        return None, -1.0
    sims = [1.0 - (d*d)/2.0 for d in dists]  # Annoy angular -> cosine-like
    return ids[0], sims[0]

def _best_sim_faq(qv, k=3):
    ids, dists = faq_index.get_nns_by_vector(qv, k, include_distances=True)
    if not ids:
        return None, -1.0
    sims = [1.0 - (d*d)/2.0 for d in dists]
    return ids[0], sims[0]

In [25]:
def _sigmoid(x: float) -> float:
    return 1.0 / (1.0 + np.exp(-x))

def _score_positive(model, text: str):
    """ Return P (product) if available, else sigmoid(decision_function),
    else None if the model exposes neither. """

    if model is None:
        return None
    try:
        return float(model.predict_proba([text])[0][1])
    except Exception:
        pass
    try:
        margin = float(model.decision_function([text])[0])
        return _sigmoid(margin)
    except Exception:
        return None

def is_product_query(text: str, svc_floor : float = 0.50, lr_floor :float = 0.60) -> bool:
    """ Route-to-product gate:
    1) SVC must predict class 1 (product)
    2) Optionally require SVC confidence (if available)
    3) Optionally require LR confidence (if available) """

    if not str(text).strip():
        return False

    # hard decision from SVC first
    try:
        svc_pred = int(svc.predict([text])[0]) == 1 if svc is not None else False
    except Exception:
        svc_pred = False
    if not svc_pred:
        return False

    # soft check on SVC confidence
    ps = _score_positive(svc, text)
    if ps is not None and ps < svc_floor:
        return False

    # LR veto (if LR is present and exposes proba / margin)
    plr = _score_positive(lr, text)
    if plr is not None and plr < lr_floor:
        return False
    return True

In [26]:
# Quick self-test

print(_name_hit("pip install numpy"))            
print(_name_hit("how to use requests?"))         

print(_lexical_product_lookup("requests"))       
print(_lexical_product_lookup("scikit learn"))   
print(_lexical_product_lookup("how to use requests?"))  

numpy numpy
None
requests requests
scikit-learn scikit-learn
None


In [27]:
# Helper: query tokens (for FAQ domain) using your lighter FAQ_STOP

def _qtoks_overlap(q: str):
    return set(t for t in re.findall(r"[a-z0-9]+", _norm(q)) if t not in FAQ_OVERLAP_STOP)

# Require semantic FAQ hits to also share some words with the FAQ question

def _faq_semantic_ok(query: str, fid: int | None, fsim: float, min_overlap: int = 2) -> bool:
    if fid is None or fsim < FAQ_SIM_TH:
        return False
    try:
        cand = faq_lex[fid]
    except Exception:
        return False
    return len(_qtoks_overlap(query) & cand["toks_overlap"]) >= min_overlap

In [28]:
# Preference thresholds

PROD_SIM_TH = 0.60   # product must be fairly strong
FAQ_SIM_TH  = 0.35   # FAQ can be a bit looser
PREF_MARGIN = 0.10   # product must beat FAQ by this much to win

_WH_WORDS = {"what","why","how","where","when","which","who","whom","whose","can","are","is","does","do"}

def _looks_like_question(q: str) -> bool:
    s = (q or "").strip().lower()
    if not s: return False
    if s.endswith("?"): return True
    # starts with wh-word or auxiliary ("can", "are", "is", "does", "do")
    first = re.findall(r"^[a-z]+", s)
    return bool(first and first[0] in _WH_WORDS)

def get_answer(query: str):
    # 0) explicit package name / pip install
    hit = _name_hit(query)
    if hit:
        return hit

    # 1) FAQ lexical FIRST (slightly looser: 2)
    hit = _faq_lexical_lookup(query, min_overlap=2)
    if hit:
        return hit

    # 2) product lexical (strict)
    hit = _lexical_product_lookup(query)
    if hit:
        return hit

    # 3) Encode once and compare sims
    qv = _encode_query(query)
    if qv is None:
        return "Sorry, I didn't understand."

    pid, psim = _best_sim_product(qv, k=3)
    fid, fsim = _best_sim_faq(qv, k=3)

    # 3a) If it looks like a QUESTION, bias toward FAQ — but require lexical overlap too
    if _looks_like_question(query):
        if _faq_semantic_ok(query, fid, fsim, min_overlap=2) and \
           (fsim + PREF_MARGIN/2 >= psim or psim < PROD_SIM_TH + 0.05):
            return faq_map[fid] if fid is not None else "Sorry, I couldn't find an answer."

    # 3b) Otherwise, prefer FAQ unless product is clearly stronger — and require overlap
    if _faq_semantic_ok(query, fid, fsim, min_overlap=2) and \
       (fsim + PREF_MARGIN >= psim or psim < PROD_SIM_TH):
        return faq_map[fid] if fid is not None else "Sorry, I couldn't find an answer."

    # Allow product only if strong and clearly better
    if psim >= PROD_SIM_TH and (psim >= fsim + PREF_MARGIN):
        item = prod_map[pid]
        return f"{item['product_id']} {item['title']}".strip()

    # 4) Tie-breaker via classifier (still conservative)
    if is_product_query(query, svc_floor=0.55, lr_floor=0.65):
        if psim >= PROD_SIM_TH - 0.05 and (psim >= fsim + PREF_MARGIN/2):
            item = prod_map[pid]
            return f"{item['product_id']} {item['title']}".strip()

    # 5) Default to FAQ only if semantic hit also passes overlap; else fallback
    if _faq_semantic_ok(query, fid, fsim, min_overlap=2):
        return faq_map[fid]
    return "Sorry, I couldn't find an answer."

In [29]:
print(get_answer('pip install numpy'))

numpy numpy


In [30]:
print(get_answer("are there any books on python?"))

Yes, there are many, and more are being published. See the python.org wiki at https://wiki.python.org/moin/PythonBooks for a list. You can also search online bookstores for “Python” and filter out the Monty Python references; or perhaps search for “Python” and “language”.


In [31]:
print(get_answer("scikit learn"))

scikit-learn scikit-learn


In [32]:
print(get_answer('what is the weather today'))

Sorry, I couldn't find an answer.


In [33]:
print(get_answer("how to use requests?")) 

Sorry, I couldn't find an answer.


In [34]:
print(get_answer("install package on windows"))

Sorry, I couldn't find an answer.
