In [9]:
# Cell 1 — Install packages
print("Installing packages...")

try:
    from google.colab import userdata
    import os
    hf = userdata.get('HF_TOKEN'); ngrok_tok = userdata.get('NGROK_TOKEN')
    smtp_e = userdata.get('SMTP_EMAIL'); smtp_p = userdata.get('SMTP_PASSWORD')
    if hf: os.environ['HF_TOKEN'] = hf
    if ngrok_tok: os.environ['NGROK_TOKEN'] = ngrok_tok
    if smtp_e: os.environ['SMTP_EMAIL'] = smtp_e
    if smtp_p: os.environ['SMTP_PASSWORD'] = smtp_p
except Exception:
    pass

!pip install --upgrade pip -q
!pip install transformers accelerate bitsandbytes -q
!pip install streamlit pyngrok PyJWT bcrypt pandas plotly matplotlib scikit-learn -q

print("Done.")


Installing packages...
Done.


In [10]:
# Cell 2 — create directories
import os
os.makedirs("backend", exist_ok=True)
os.makedirs("streamlit_app/avatars", exist_ok=True)
print("Directories created:", os.listdir("."))


Directories created: ['.config', 'streamlit.log', 'streamlit.err', 'backend', 'app.py', 'streamlit_app', 'sample_data']


In [11]:
%%writefile backend/storage.py
# backend/storage.py
import os, json, threading, base64
from datetime import datetime
lock = threading.Lock()

DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "streamlit_app"))
os.makedirs(DATA_DIR, exist_ok=True)
USERS_FILE = os.path.join(DATA_DIR, "users.json")
ACTIVITY_FILE = os.path.join(DATA_DIR, "activity.json")
INDEX_FILE = os.path.join(DATA_DIR, "index.json")
AVATAR_DIR = os.path.join(DATA_DIR, "avatars")
os.makedirs(AVATAR_DIR, exist_ok=True)

def _ensure():
    if not os.path.exists(USERS_FILE):
        with open(USERS_FILE, "w", encoding="utf-8") as f:
            json.dump([], f)
    if not os.path.exists(ACTIVITY_FILE):
        with open(ACTIVITY_FILE, "w", encoding="utf-8") as f:
            json.dump([], f)
    if not os.path.exists(INDEX_FILE):
        with open(INDEX_FILE, "w", encoding="utf-8") as f:
            json.dump({}, f)

def read_users():
    _ensure()
    with lock:
        with open(USERS_FILE, "r", encoding="utf-8") as f:
            try:
                return json.load(f)
            except:
                return []

def write_users(users):
    with lock:
        with open(USERS_FILE, "w", encoding="utf-8") as f:
            json.dump(users, f, indent=2, default=str)

def append_activity(entry: dict):
    _ensure()
    entry["timestamp"] = datetime.utcnow().isoformat()
    with lock:
        arr = []
        with open(ACTIVITY_FILE, "r", encoding="utf-8") as f:
            try:
                arr = json.load(f)
            except:
                arr = []
        arr.append(entry)
        with open(ACTIVITY_FILE, "w", encoding="utf-8") as f:
            json.dump(arr, f, indent=2, default=str)
    return entry

def read_activity():
    _ensure()
    with open(ACTIVITY_FILE, "r", encoding="utf-8") as f:
        try:
            return json.load(f)
        except:
            return []

def write_index(idx: dict):
    with lock:
        with open(INDEX_FILE, "w", encoding="utf-8") as f:
            json.dump(idx, f, indent=2)

def read_index():
    _ensure()
    with open(INDEX_FILE, "r", encoding="utf-8") as f:
        try:
            return json.load(f)
        except:
            return {}

def save_avatar(user_id: str, filename: str, content_bytes: bytes):
    # choose extension from filename
    ext = os.path.splitext(filename)[1].lower() or ".png"
    safe = f"{user_id}{ext}"
    path = os.path.join(AVATAR_DIR, safe)
    with open(path, "wb") as f:
        f.write(content_bytes)
    return path

def get_avatar_path(user_id: str):
    # find file with prefix user_id in avatars
    for fn in os.listdir(AVATAR_DIR):
        if fn.startswith(user_id):
            return os.path.join(AVATAR_DIR, fn)
    return None


Writing backend/storage.py


In [21]:
%%writefile backend/search_index.py
# backend/search_index.py (fixed absolute imports)
import re, json, os
from collections import defaultdict, Counter
from storage import read_activity, read_index, write_index

def _tokenize(text: str):
    if not text:
        return []
    tokens = re.findall(r"\w+", text.lower())
    return tokens

def build_index():
    activities = read_activity()
    index = {}
    posting = defaultdict(list)
    for i, a in enumerate(activities):
        doc_id = i
        text_fields = []
        for k in ("prompt","code","comment","snippet","username","model","module"):
            if a.get(k):
                text_fields.append(str(a.get(k)))
        combined = " ".join(text_fields)
        tokens = _tokenize(combined)
        counts = Counter(tokens)
        for t, cnt in counts.items():
            posting[t].append({"doc": doc_id, "score": cnt})
    # convert defaultdict to normal dict for json
    index["posting"] = {k: v for k, v in posting.items()}
    index["docs_count"] = len(activities)
    write_index(index)
    return index

def add_activity_to_index(activity):
    return build_index()

def search(query: str, top_k: int = 20):
    idx = read_index()
    if not idx or "posting" not in idx:
        return []
    tokens = _tokenize(query)
    scores = {}
    for t in tokens:
        postings = idx["posting"].get(t, [])
        for p in postings:
            doc = p["doc"]
            scores[doc] = scores.get(doc, 0) + p.get("score",1)
    ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    activities = read_activity()
    results = []
    for doc_id, sc in ranked[:top_k]:
        if doc_id < len(activities):
            a = activities[doc_id].copy()
            a["_score"] = sc
            a["_doc_id"] = doc_id
            results.append(a)
    return results


Overwriting backend/search_index.py


In [18]:
%%writefile backend/auth_module.py
# backend/auth_module.py (fixed: absolute imports)
import os, bcrypt, jwt, secrets, logging, smtplib
from typing import Optional, Dict
from datetime import datetime, timedelta
from email.mime.text import MIMEText

# absolute import (not relative) so module works when run as a script
from storage import read_users, write_users, append_activity

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("auth")

JWT_SECRET = os.environ.get("JWT_SECRET_KEY", os.environ.get("SECRET_KEY", "codegenie-secret"))
JWT_ALG = "HS256"
ACCESS_EXPIRE_MINUTES = 20
REFRESH_EXPIRE_DAYS = 7

SMTP_EMAIL = os.environ.get("SMTP_EMAIL", "")
SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD", "")
SMTP_SERVER = os.environ.get("SMTP_SERVER", "smtp.gmail.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", 587))

MAX_ADMIN = 2

def _find_user_by_username(username):
    users = read_users()
    username = username.strip().lower()
    return next((u for u in users if u.get("username","").strip().lower()==username), None)

def _hash_password(plain):
    salt = bcrypt.gensalt()
    h = bcrypt.hashpw(plain.encode(), salt)
    return h.decode()

def _verify_password(plain, hashed):
    try:
        return bcrypt.checkpw(plain.encode(), hashed.encode())
    except Exception:
        return False

def _issue_tokens(user_id, role):
    now = datetime.utcnow()
    access_payload = {
        "sub": user_id,
        "role": role,
        "exp": int((now + timedelta(minutes=ACCESS_EXPIRE_MINUTES)).timestamp()),
        "type": "access"
    }
    refresh_payload = {
        "sub": user_id,
        "role": role,
        "exp": int((now + timedelta(days=REFRESH_EXPIRE_DAYS)).timestamp()),
        "type": "refresh"
    }
    access = jwt.encode(access_payload, JWT_SECRET, algorithm=JWT_ALG)
    refresh = jwt.encode(refresh_payload, JWT_SECRET, algorithm=JWT_ALG)
    return access, refresh

def register(username: str, password: str, email: str="", security_q: str="", security_a: str="", make_admin: bool=False):
    username = username.strip()
    if _find_user_by_username(username):
        return {"success": False, "error": "Username already exists"}
    users = read_users()
    if make_admin:
        admins = [u for u in users if u.get("role")=="admin"]
        if len(admins) >= MAX_ADMIN:
            return {"success": False, "error": f"Maximum admins ({MAX_ADMIN}) already present"}
    uid = secrets.token_hex(12)
    hashed = _hash_password(password)
    new = {
        "user_id": uid,
        "username": username,
        "password_hash": hashed,
        "email": email,
        "security_q": security_q,
        "security_a_hash": _hash_password(security_a) if security_a else "",
        "role": "admin" if make_admin else "user",
        "created_at": datetime.utcnow().isoformat()
    }
    users.append(new)
    write_users(users)
    append_activity({"type":"register","user_id":uid,"username":username})
    access, refresh = _issue_tokens(uid, new["role"])
    return {"success": True, "user_id": uid, "access_token": access, "refresh_token": refresh}

def login(username: str, password: str):
    u = _find_user_by_username(username)
    if not u:
        return {"success": False, "error": "Invalid credentials"}
    if not _verify_password(password, u.get("password_hash","")):
        return {"success": False, "error": "Invalid credentials"}
    access, refresh = _issue_tokens(u["user_id"], u.get("role","user"))
    append_activity({"type":"login","user_id":u["user_id"],"username":u["username"]})
    return {"success": True, "access_token": access, "refresh_token": refresh, "role": u.get("role","user"), "user_id": u["user_id"]}

def decode_token(token: str):
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG])
        return payload
    except Exception:
        return None

def generate_and_send_otp(username_or_email):
    users = read_users()
    target = next((u for u in users if u.get("username")==username_or_email or u.get("email")==username_or_email), None)
    if not target:
        return {"success": False, "error": "User not found"}
    otp = str(secrets.randbelow(900000)+100000)
    expiry = (datetime.utcnow() + timedelta(minutes=10)).isoformat()
    target["otp"] = otp
    target["otp_expiry"] = expiry
    write_users(users)
    sent = False
    if target.get("email") and SMTP_EMAIL and SMTP_PASSWORD:
        try:
            msg = MIMEText(f"Your CodeGenie OTP: {otp}. Expires in 10 minutes.")
            msg["From"] = SMTP_EMAIL
            msg["To"] = target["email"]
            msg["Subject"] = "CodeGenie OTP"
            s = smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=10)
            s.starttls()
            s.login(SMTP_EMAIL, SMTP_PASSWORD)
            s.sendmail(SMTP_EMAIL, [target["email"]], msg.as_string())
            s.quit()
            sent = True
        except Exception:
            sent = False
    append_activity({"type":"otp_sent","user_id":target["user_id"],"username":target["username"],"email_sent":sent})
    return {"success": True, "otp_sent": sent, "otp": otp}

def verify_otp_and_reset(username_or_email, otp, new_password):
    users = read_users()
    target = next((u for u in users if u.get("username")==username_or_email or u.get("email")==username_or_email), None)
    if not target:
        return {"success": False, "error": "User not found"}
    if str(target.get("otp")) != str(otp):
        return {"success": False, "error": "Invalid OTP"}
    try:
        exp = datetime.fromisoformat(target.get("otp_expiry"))
        if datetime.utcnow() > exp:
            return {"success": False, "error": "OTP expired"}
    except Exception:
        pass
    target["password_hash"] = _hash_password(new_password)
    target.pop("otp", None); target.pop("otp_expiry", None)
    write_users(users)
    append_activity({"type":"password_reset","user_id":target["user_id"],"username":target["username"]})
    return {"success": True}


Overwriting backend/auth_module.py


In [19]:
%%writefile backend/code_generator_module.py
# backend/code_generator_module.py (fixed absolute imports)
import os, logging
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from storage import append_activity

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("code_gen")

MODEL_CHOICES = {
    "Gemma-2B-IT": "google/gemma-2b-it",
    "DeepSeek-Coder-1.3B": "deepseek-ai/deepseek-coder-1.3b-instruct",
    "Phi-2": "microsoft/phi-2"
}

_model_cache = {}

def _load(model_id: str):
    if model_id in _model_cache:
        return _model_cache[model_id]
    hf = os.environ.get("HF_TOKEN", None)
    use_cuda = torch.cuda.is_available()
    logger.info("Loading %s (cuda=%s)", model_id, use_cuda)
    tokenizer = AutoTokenizer.from_pretrained(model_id, use_auth_token=hf, trust_remote_code=True)
    if use_cuda:
        model = AutoModelForCausalLM.from_pretrained(model_id, use_auth_token=hf, device_map="auto", torch_dtype=torch.float16)
    else:
        model = AutoModelForCausalLM.from_pretrained(model_id, use_auth_token=hf, low_cpu_mem_usage=True)
        model.to("cpu")
    _model_cache[model_id] = {"model": model, "tokenizer": tokenizer, "device": next(model.parameters()).device}
    return _model_cache[model_id]

def generate_code(user_id: str, prompt: str, language: str, model_key: str):
    model_id = MODEL_CHOICES.get(model_key)
    if not model_id:
        return {"success": False, "error": "Invalid model selection"}
    try:
        meta = _load(model_id)
    except Exception as e:
        logger.exception("Model load error")
        return {"success": False, "error": f"Failed to load model {model_id}. Set HF_TOKEN and use a GPU-enabled runtime if required. Exception: {e}"}
    model = meta["model"]; tokenizer = meta["tokenizer"]; device = meta["device"]
    system_prompt = f"You are an expert {language} developer. Output only valid {language} code runnable without extra explanation."
    full = system_prompt + "\n\n" + prompt + "\n\n# Solution:\n"
    try:
        inputs = tokenizer(full, return_tensors="pt", truncation=True, max_length=1024)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        out = model.generate(**inputs, max_new_tokens=512, temperature=0.2, do_sample=False, pad_token_id=tokenizer.eos_token_id)
        text = tokenizer.decode(out[0], skip_special_tokens=True)
        if text.startswith(full):
            text = text[len(full):].strip()
        if "```" in text:
            parts = text.split("```")
            if len(parts) >= 2:
                text = parts[1].strip()
        append_activity({"type":"generate","user_id":user_id,"model":model_id,"language":language,"prompt":prompt})
        return {"success":True,"model":model_id,"code":text}
    except Exception as e:
        logger.exception("Generation error")
        return {"success": False, "error": f"Generation failed: {e}"}


Overwriting backend/code_generator_module.py


In [20]:
%%writefile backend/code_explainer_module.py
# backend/code_explainer_module.py (fixed absolute imports)
import os, logging, ast
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from storage import append_activity

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("code_explain")

MODEL_CHOICES = {
    "Gemma-2B-IT": "google/gemma-2b-it",
    "DeepSeek-Coder-1.3B": "deepseek-ai/deepseek-coder-1.3b-instruct",
    "Phi-2": "microsoft/phi-2"
}

_model_cache = {}

def _load(model_id: str):
    if model_id in _model_cache:
        return _model_cache[model_id]
    hf = os.environ.get("HF_TOKEN", None)
    use_cuda = torch.cuda.is_available()
    tokenizer = AutoTokenizer.from_pretrained(model_id, use_auth_token=hf, trust_remote_code=True)
    if use_cuda:
        model = AutoModelForCausalLM.from_pretrained(model_id, use_auth_token=hf, device_map="auto", torch_dtype=torch.float16)
    else:
        model = AutoModelForCausalLM.from_pretrained(model_id, use_auth_token=hf, low_cpu_mem_usage=True)
        model.to("cpu")
    _model_cache[model_id] = {"model": model, "tokenizer": tokenizer, "device": next(model.parameters()).device}
    return _model_cache[model_id]

def explain_code(user_id: str, code: str, language: str, model_key: str, style: str="Beginner-Friendly"):
    model_id = MODEL_CHOICES.get(model_key)
    if not model_id:
        return {"success": False, "error": "Invalid model selection"}
    try:
        meta = _load(model_id)
    except Exception as e:
        logger.exception("Model load error")
        return {"success": False, "error": f"Failed to load model {model_id}. Set HF_TOKEN and use a GPU-enabled runtime if required. Exception: {e}"}
    tokenizer = meta["tokenizer"]; model = meta["model"]; device = meta["device"]
    structure = ""
    if language.lower()=="python":
        try:
            tree = ast.parse(code)
            funcs = [n.name for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)]
            classes = [n.name for n in ast.walk(tree) if isinstance(n, ast.ClassDef)]
            structure = f"Functions: {funcs or 'None'}\nClasses: {classes or 'None'}\n"
        except Exception:
            structure = "AST parse failed or not applicable.\n"
    prompt = f"You are an expert {language} developer. Provide a {style} explanation covering logic, time complexity, edge cases, and improvements.\n\nStructure:\n{structure}\nCode:\n{code}\n\nExplanation:\n"
    try:
        inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024)
        inputs = {k:v.to(device) for k,v in inputs.items()}
        out = model.generate(**inputs, max_new_tokens=512, temperature=0.2, do_sample=False, pad_token_id=tokenizer.eos_token_id)
        text = tokenizer.decode(out[0], skip_special_tokens=True)
        if text.startswith(prompt):
            text = text[len(prompt):].strip()
        append_activity({"type":"explain","user_id":user_id,"model":model_id,"language":language,"snippet":code[:200]})
        return {"success":True,"model":model_id,"explanation":text}
    except Exception as e:
        logger.exception("Explain error")
        return {"success":False,"error":f"Explain failed: {e}"}


Overwriting backend/code_explainer_module.py


In [24]:
%%writefile app.py
# app.py - CodeGenie Streamlit front-end (patched: safe History DataFrame handling)
import streamlit as st, os, sys, io
st.set_page_config("CodeGenie", layout="wide")
sys.path.append(os.path.abspath("backend"))

from auth_module import register, login, decode_token, generate_and_send_otp, verify_otp_and_reset
from storage import read_users, write_users, append_activity, get_avatar_path, save_avatar, read_activity
from code_generator_module import generate_code, MODEL_CHOICES as GEN_MODELS
from code_explainer_module import explain_code, MODEL_CHOICES as EXP_MODELS
from search_index import build_index, search
import plotly.express as px
import pandas as pd
import base64

# session initial
for k in ("access_token","refresh_token","user_id","role","flash"):
    if k not in st.session_state: st.session_state[k] = None

def show_flash():
    if st.session_state.get("flash"):
        st.success(st.session_state["flash"])
        st.session_state["flash"] = None

# Sidebar auth
st.sidebar.title("CodeGenie")
if st.session_state.get("access_token"):
    payload = decode_token(st.session_state["access_token"])
    if payload:
        uid = payload.get("sub")
        users = read_users()
        me = next((u for u in users if u.get("user_id")==uid), {})
        display_name = me.get("username","you")
        st.sidebar.write(f"Signed in: **{display_name}**")
        avatar_path = get_avatar_path(uid)
        if avatar_path:
            st.sidebar.image(avatar_path, width=64)
    if st.sidebar.button("Logout"):
        st.session_state["access_token"] = None
        st.session_state["user_id"] = None
        st.session_state["role"] = None
        st.session_state["flash"] = "Logged out."
else:
    auth_tab = st.sidebar.selectbox("Auth", ["Login","Register","Recover Password"])
    if auth_tab == "Login":
        user = st.sidebar.text_input("Username")
        pwd = st.sidebar.text_input("Password", type="password")
        if st.sidebar.button("Login"):
            res = login(user, pwd)
            if res.get("success"):
                st.session_state["access_token"] = res["access_token"]
                st.session_state["refresh_token"] = res["refresh_token"]
                st.session_state["user_id"] = res["user_id"]
                st.session_state["role"] = res["role"]
                st.session_state["flash"] = "Logged in."
            else:
                st.sidebar.error(res.get("error","Login failed"))
    elif auth_tab == "Register":
        new_user = st.sidebar.text_input("Choose username")
        new_pwd = st.sidebar.text_input("Choose password", type="password")
        new_email = st.sidebar.text_input("Email (for OTP)")
        sec_q = st.sidebar.text_input("Security question (fallback)")
        sec_a = st.sidebar.text_input("Security answer (fallback)")
        make_admin = st.sidebar.checkbox("Make admin (restricted)")
        if st.sidebar.button("Register"):
            res = register(new_user, new_pwd, new_email, sec_q, sec_a, make_admin)
            if res.get("success"):
                st.session_state["access_token"] = res["access_token"]
                st.session_state["refresh_token"] = res["refresh_token"]
                st.session_state["user_id"] = res["user_id"]
                st.session_state["role"] = "admin" if make_admin else "user"
                st.session_state["flash"] = "Registration successful."
            else:
                st.sidebar.error(res.get("error","Registration failed"))
    else:
        who = st.sidebar.text_input("Username or registered email")
        if st.sidebar.button("Send OTP"):
            res = generate_and_send_otp(who)
            if res.get("success"):
                st.sidebar.success("OTP generated. Check your email or see OTP printed if SMTP not configured.")
                st.sidebar.info(f"OTP (for testing): {res.get('otp')}")
        otp = st.sidebar.text_input("Enter OTP")
        newp = st.sidebar.text_input("New password", type="password")
        if st.sidebar.button("Reset password"):
            res = verify_otp_and_reset(who, otp, newp)
            if res.get("success"):
                st.sidebar.success("Password reset successful. Please login.")
            else:
                st.sidebar.error(res.get("error","Reset failed"))

show_flash()

st.title("CodeGenie — AI Explainer & Code Generator")

# Simple demo when not logged in
if not st.session_state.get("access_token"):
    st.info("Please login/register to use full features. Demo below does attempt model use and will error if models can't be loaded.")
    demo_tabs = st.tabs(["Generator Demo","Explainer Demo","Search Demo"])
    with demo_tabs[0]:
        prompt = st.text_area("Demo prompt", height=150)
        model_choice = st.selectbox("Model", list(GEN_MODELS.keys()))
        language = st.selectbox("Language", ["Python","JavaScript","SQL"])
        if st.button("Generate demo code"):
            res = generate_code("demo", prompt, language, model_choice)
            if res.get("success"):
                st.code(res["code"], language="python" if language.lower()=="python" else None)
            else:
                st.error(res.get("error"))
    with demo_tabs[1]:
        code = st.text_area("Code to explain (demo)", height=220)
        model_choice_e = st.selectbox("Model (explainer)", list(EXP_MODELS.keys()))
        language_e = st.selectbox("Language for demo", ["Python","JavaScript","SQL"])
        if st.button("Explain demo code"):
            res = explain_code("demo", code, language_e, model_choice_e)
            if res.get("success"):
                st.markdown(res["explanation"])
            else:
                st.error(res.get("error"))
    with demo_tabs[2]:
        q = st.text_input("Search demo (indexes will be empty until you use the app)")
        if st.button("Search demo"):
            build_index()
            results = search(q)
            st.write(f"Found {len(results)} results")
            for r in results:
                st.write(r.get("timestamp"), r.get("type"), r.get("_score"))
    st.stop()

# Logged-in user
payload = decode_token(st.session_state["access_token"])
if not payload:
    st.error("Session invalid/expired. Login again.")
    st.stop()

user_id = payload.get("sub")
role = payload.get("role","user")
users = read_users()
me = next((u for u in users if u.get("user_id")==user_id), {})

tabs = st.tabs(["Generator","Explainer","History","Search","Profile"] + (["Admin"] if role=="admin" else []))

# Generator
with tabs[0]:
    st.header("Text → Code Generator")
    model_choice = st.selectbox("Model", list(GEN_MODELS.keys()))
    language = st.selectbox("Language", ["Python","JavaScript","SQL"])
    prompt = st.text_area("Describe what to implement:", height=180)
    if st.button("Generate code"):
        with st.spinner("Generating..."):
            res = generate_code(user_id, prompt, language, model_choice)
            if res.get("success"):
                st.code(res["code"], language="python" if language.lower()=="python" else None)
                rating = st.radio("Rate output (1-5):", [1,2,3,4,5], index=4, horizontal=True, key="gen_rating")
                comment = st.text_area("Comment (optional)", key="gen_comment")
                if st.button("Submit generator feedback"):
                    append_activity({"type":"feedback","user_id":user_id,"module":"generator","model":res["model"],"rating":rating,"comment":comment})
                    st.success("Thanks for feedback.")
                    # reindex
                    build_index()
            else:
                st.error(res.get("error"))

# Explainer
with tabs[1]:
    st.header("Code → Explanation")
    model_choice_e = st.selectbox("Model (explainer)", list(EXP_MODELS.keys()))
    language_e = st.selectbox("Language of code", ["Python","JavaScript","SQL"])
    code_in = st.text_area("Paste code to explain:", height=220)
    style = st.selectbox("Explain style", ["Beginner-Friendly","Technical Deep-Dive","Step-by-Step Guide","Bullet Points"])
    if st.button("Explain code"):
        with st.spinner("Explaining..."):
            res = explain_code(user_id, code_in, language_e, model_choice_e, style)
            if res.get("success"):
                st.markdown(res["explanation"])
                rating = st.radio("Rate explanation (1-5):", [1,2,3,4,5], index=4, horizontal=True, key="exp_rating")
                comment = st.text_area("Comment (optional)", key="exp_comment")
                if st.button("Submit explainer feedback"):
                    append_activity({"type":"feedback","user_id":user_id,"module":"explainer","model":res["model"],"rating":rating,"comment":comment})
                    st.success("Feedback saved.")
                    build_index()
            else:
                st.error(res.get("error"))

# History
with tabs[2]:
    st.header("Your Activity History")
    acts = read_activity()
    mine = [a for a in acts if a.get("user_id")==user_id]
    mine_sorted = sorted(mine, key=lambda x: x.get("timestamp",""), reverse=True)
    df = pd.DataFrame(mine_sorted)
    # SAFE selection: ensure the columns we want exist (create them with empty values if missing)
    desired_cols = ["timestamp","type","model","language","rating","comment"]
    if df.empty:
        st.info("No activity yet.")
    else:
        # reindex will add missing columns with NaN, then fill with empty string for display
        df_display = df.reindex(columns=desired_cols).fillna("")
        st.dataframe(df_display)

# Search
with tabs[3]:
    st.header("Search your or global history")
    q = st.text_input("Search query")
    scope = st.selectbox("Scope", ["My activity", "Global (admin only)"])
    if st.button("Search"):
        build_index()
        results = search(q)
        if scope=="My activity":
            results = [r for r in results if r.get("user_id")==user_id]
        else:
            if role!="admin":
                st.error("Global search only for admins.")
                results = []
        st.write(f"Found {len(results)} results")
        for r in results:
            st.write(r.get("timestamp"), "|", r.get("type"), "|", r.get("username", r.get("user_id")), "| score:", r.get("_score"))
            if r.get("prompt"):
                st.code(r.get("prompt")[:1000])
            if r.get("snippet"):
                st.code(r.get("snippet")[:1000])

# Profile (avatar upload)
with tabs[4]:
    st.header("Profile")
    st.write("Username:", me.get("username"))
    st.write("Email:", me.get("email"))
    st.write("Role:", me.get("role"))
    avatar_path = get_avatar_path(user_id)
    if avatar_path:
        st.image(avatar_path, width=150)
    uploaded = st.file_uploader("Upload avatar (png/jpg)", type=["png","jpg","jpeg"])
    if uploaded:
        raw = uploaded.getvalue()
        saved = save_avatar(user_id, uploaded.name, raw)
        st.success(f"Avatar saved: {saved}")
        st.image(saved, width=150)

    if st.button("Send OTP to change password"):
        res = generate_and_send_otp(me.get("username") or me.get("email"))
        if res.get("success"):
            st.success("OTP generated. Check your email (or sidebar testing OTP).")
            st.info(f"OTP (for testing): {res.get('otp')}")
    otp = st.text_input("Enter OTP to change password")
    newp = st.text_input("New password", type="password")
    if st.button("Confirm password change"):
        res = verify_otp_and_reset(me.get("username") or me.get("email"), otp, newp)
        if res.get("success"):
            st.success("Password changed.")

# Admin dashboard
if role=="admin":
    with tabs[-1]:
        st.header("Admin Analytics & Management")
        users = read_users()
        acts = read_activity()
        st.subheader("User growth (registrations over time)")
        # prepare chart data
        reg_dates = [u.get("created_at","")[:10] for u in users if u.get("created_at")]
        if reg_dates:
            df_reg = pd.DataFrame({"date": reg_dates})
            df_reg = df_reg.groupby("date").size().reset_index(name="count")
            fig = px.bar(df_reg, x="date", y="count", title="New users per day")
            st.plotly_chart(fig, use_container_width=True)
        else:
            st.info("No registrations yet.")

        st.subheader("Activity breakdown by language and module")
        df_act = pd.DataFrame(acts)
        if not df_act.empty:
            # languages pie
            langs = df_act["language"].fillna("unknown")
            fig2 = px.pie(names=langs, title="Language usage (all activities)")
            st.plotly_chart(fig2, use_container_width=True)
            # model usage bar
            models = df_act["model"].fillna("unknown")
            mdf = models.value_counts().reset_index()
            mdf.columns = ["model","count"]
            fig3 = px.bar(mdf, x="model", y="count", title="Model usage")
            st.plotly_chart(fig3, use_container_width=True)
            # top users
            top_users = df_act["user_id"].value_counts().head(10).reset_index()
            top_users.columns = ["user_id","count"]
            # map user ids to usernames
            uid_to_name = {u.get("user_id"):u.get("username") for u in users}
            top_users["username"] = top_users["user_id"].map(uid_to_name).fillna(top_users["user_id"])
            fig4 = px.bar(top_users, x="username", y="count", title="Top active users")
            st.plotly_chart(fig4, use_container_width=True)
            # feedback average per module
            fb = df_act[df_act["type"]=="feedback"]
            if not fb.empty:
                fb["rating"] = pd.to_numeric(fb["rating"], errors="coerce")
                fb_avg = fb.groupby("module")["rating"].mean().reset_index()
                fig5 = px.bar(fb_avg, x="module", y="rating", title="Average rating per module")
                st.plotly_chart(fig5, use_container_width=True)
        else:
            st.info("No activities yet.")

        st.subheader("User management")
        for u in users:
            cols = st.columns([2,2,1,1])
            cols[0].write(u.get("username"))
            cols[1].write(u.get("email"))
            cols[2].write(u.get("role"))
            if cols[3].button("Delete", key=f"del_{u.get('user_id')}"):
                users = [x for x in users if x.get("user_id")!=u.get("user_id")]
                write_users(users)
                st.experimental_rerun()


Overwriting app.py


In [25]:
# Cell 10 — Launch Streamlit and optionally ngrok
import subprocess, time, socket, os
from pyngrok import ngrok
ngrok_token = os.environ.get("NGROK_TOKEN")
if ngrok_token:
    ngrok.set_auth_token(ngrok_token)

try:
    subprocess.run(["pkill", "-f", "streamlit"], check=False)
except Exception:
    pass

log_out = open("streamlit.log", "w")
log_err = open("streamlit.err", "w")
proc = subprocess.Popen(["streamlit","run","app.py","--server.port","8501"], stdout=log_out, stderr=log_err)

def wait_port(host="localhost", port=8501, timeout=60):
    start = time.time()
    while time.time()-start < timeout:
        try:
            with socket.create_connection((host, port), timeout=2):
                return True
        except Exception:
            time.sleep(1)
    return False

print("Waiting for Streamlit to start...")
if wait_port():
    print("Streamlit started at http://localhost:8501")
    if ngrok_token:
        public_url = ngrok.connect(8501)
        print("Public URL:", public_url.public_url)
else:
    print("Streamlit failed to start. See streamlit.err")
    !tail -n 200 streamlit.err


Waiting for Streamlit to start...
Streamlit started at http://localhost:8501
Public URL: https://nonbureaucratically-lumpish-lashaun.ngrok-free.dev
