<a href="https://colab.research.google.com/github/DineshGujjeti/Infosys_Springboard_CodeGenie/blob/main/Milestone3/code/milestone3_codeGenie.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from google.colab import userdata

# 1. Your Hugging Face READ token (Must be NEWLY generated to avoid 401)
#HF_TOKEN_VALUE = ''

# 2. Your JWT Secret Key
JWT_SECRET_VALUE = 'm9B8x7A6z5Y4w3V2u1T0s9R8q7P6o5N4m3L2k1J0h9G8f7D6s5A4q3W2e1'

# 3. Your ngrok Authtoken (Can be read from Colab secrets if saved there)
try:
    NGROK_TOKEN_VALUE = userdata.get('NGROK_TOKEN')
except:
   # NGROK_TOKEN_VALUE = '' # Fallback if userdata fails

# Set secrets as environment variables for reliable access by the Streamlit app
os.environ['HF_TOKEN'] = HF_TOKEN_VALUE
os.environ['JWT_SECRET_KEY'] = JWT_SECRET_VALUE
os.environ['NGROK_TOKEN'] = NGROK_TOKEN_VALUE

# Install dependencies
# Install all required packages

# Install PyTorch, Transformers, Streamlit, and other utilities
!pip install --upgrade pip

# PyTorch (with CUDA if available)
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# Transformers for model loading
!pip install transformers accelerate

# Streamlit and utilities
!pip install streamlit pyngrok PyJWT bcrypt requests graphviz ipywidgets pandas matplotlib seaborn radon

# ... (rest of your initial setup code)

# Install dependencies
!pip install --upgrade pip

# PyTorch (with CUDA if available)
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# Install bitsandbytes specifically for 4-bit quantization
!pip install bitsandbytes


In [None]:
%%writefile app.py

import streamlit as st
import sqlite3
import bcrypt
import jwt
import requests
import json
import os
import graphviz
import ast
import torch
# Import necessary components for local LLM loading
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from datetime import datetime, timedelta
from google.colab import userdata

# --- 1. CONFIGURATION & SECRETS ---

# Load secrets from environment variables
HF_TOKEN = os.environ.get('HF_TOKEN')
JWT_SECRET = os.environ.get('JWT_SECRET_KEY', "dummy_jwt")
NGROK_TOKEN = os.environ.get('NGROK_TOKEN', "dummy_ngrok")

# Database file
DB_NAME = "users.db"

# --- 2. DATABASE UTILITIES ---

def init_db():
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        username TEXT UNIQUE NOT NULL,
        password_hash TEXT NOT NULL,
        security_question TEXT NOT NULL,
        security_answer_hash TEXT NOT NULL,
        role TEXT NOT NULL DEFAULT 'user'
    )
    ''')
    c.execute("SELECT COUNT(*) FROM users")
    user_count = c.fetchone()[0]
    conn.commit()
    conn.close()
    return user_count == 0

def hash_text(text):
    salt = bcrypt.gensalt()
    return bcrypt.hashpw(text.encode('utf-8'), salt)

def check_hash(text, hashed):
    return bcrypt.checkpw(text.encode('utf-8'), hashed)

def add_user(username, password, question, answer, role='user'):
    try:
        conn = sqlite3.connect(DB_NAME)
        c = conn.cursor()
        password_hash = hash_text(password)
        answer_hash = hash_text(answer)
        c.execute(
            "INSERT INTO users (username, password_hash, security_question, security_answer_hash, role) VALUES (?, ?, ?, ?, ?)",
            (username, password_hash, question, answer_hash, role)
        )
        conn.commit()
        return True
    except sqlite3.IntegrityError:
        return False
    finally:
        conn.close()

def verify_user(username, password):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT password_hash, role FROM users WHERE username=?", (username,))
    user = c.fetchone()
    conn.close()
    if user:
        password_hash, role = user
        if check_hash(password, password_hash):
            return {"username": username, "role": role}
    return None

def get_user_details(username):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT username, security_question, security_answer_hash, role FROM users WHERE username=?", (username,))
    user = c.fetchone()
    conn.close()
    if user:
        return {"username": user[0], "question": user[1], "answer_hash": user[2], "role": user[3]}
    return None

def update_user_password(username, new_password):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    new_password_hash = hash_text(new_password)
    c.execute("UPDATE users SET password_hash = ? WHERE username = ?", (new_password_hash, username))
    conn.commit()
    conn.close()

def get_all_users():
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT id, username, role FROM users")
    users = c.fetchall()
    conn.close()
    return users

# --- 3. AUTHENTICATION (JWT) ---

def create_jwt(username, role):
    payload = {
        'sub': username,
        'role': role,
        'iat': datetime.utcnow(),
        'exp': datetime.utcnow() + timedelta(hours=24)
    }
    token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
    return token

def verify_jwt(token):
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
        return payload
    except jwt.ExpiredSignatureError:
        st.error("Session expired. Please log in again.")
        return None
    except jwt.InvalidTokenError:
        st.error("Invalid token. Please log in again.")
        return None

# --- 4. HUGGING FACE LOCAL MODEL CALL  ---

# Model mapping: keys = user-friendly name, values = local/remote HF repo
MODELS = {
    # Using full model IDs for Hugging Face Transformers local loading
    "Gemma-2B-IT": "google/gemma-2b-it",
    "DeepSeek-Coder-1.3B": "deepseek-ai/deepseek-coder-1.3b-instruct",
    "Phi-2": "microsoft/phi-2"
}

# --- FIX: CACHING MODEL LOADING ---
@st.cache_resource
def load_model(model_path):
    """Load the model and tokenizer only once into GPU/CPU, using quantization."""

    st.info(f"Loading Model: {model_path}... This may take a few minutes for first load.")

    # Use the loaded HF_TOKEN for access to gated models (like Gemma)
    token = os.environ.get('HF_TOKEN')

    # 4-bit quantization config for Colab T4 GPU (critical for memory)
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16
    )

    try:
        tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True, token=token)
        model = AutoModelForCausalLM.from_pretrained(
            model_path,
            quantization_config=quantization_config,
            device_map="auto",  # automatically selects GPU if available
            torch_dtype=torch.bfloat16,
            trust_remote_code=True,
            token=token # Pass token to model loading as well
        )
        st.success(f"Model {model_path} loaded successfully!")
        return model, tokenizer
    except Exception as e:
        st.error(f"Failed to load model {model_path}: {e}")
        return None, None

def generate_code(prompt, model_path):
    """Generate code using the selected self-hosted model."""

    model, tokenizer = load_model(model_path)

    if model is None:
        return f"Error: Model {model_path} failed to load."

    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    attention_mask = inputs.attention_mask

    output_ids = model.generate(
        inputs.input_ids,
        attention_mask=attention_mask,
        max_new_tokens=512,
        temperature=0.1,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id
    )

    response_tokens = output_ids[0][inputs.input_ids.shape[-1]:]

    # --- FIX 1: Decode without skipping special tokens if necessary for newlines ---
    # We will try with skip_special_tokens=False first to see if model generates newlines.
    code = tokenizer.decode(response_tokens, skip_special_tokens=False)

    # --- FIX 2: Wrap output in a Streamlit Markdown code block for clean display ---
    # This automatically handles newlines and formatting.
    formatted_code = f"```python\n{code.strip()}\n```"

    return formatted_code

# NOTE: Since we switched to local models, we redefine query_hf_model to use the local function.
def query_hf_model(payload, model_name):
    """Uses the local generate_code function for the explainer page."""
    model_path = MODELS[model_name]
    try:
        return generate_code(payload, model_path)
    except Exception as e:
        # Catch any errors from generate_code/load_model
        return f"Error: Code Generation/Explanation failed: {str(e)}"

# --- 5. AST VISUALIZER  ---

class ASTVisualizer:
    def __init__(self):
        self.graph = graphviz.Digraph(comment='Python AST', graph_attr={'rankdir': 'TB', 'bgcolor': '#333333'}, node_attr={'style': 'filled', 'color': '#FF6347', 'fontcolor': 'white', 'fillcolor': '#444444'})

    def traverse(self, node, parent_id=None):
        node_id = str(id(node))
        node_label = type(node).__name__

        if hasattr(node, 'id') and isinstance(node.id, str):
            node_label = f"Name\n(id='{node.id}')"
        elif isinstance(node, ast.Constant):
            node_label = f"Constant\n(value={repr(node.value)})"
        elif isinstance(node, ast.Attribute):
            node_label = f"Attribute\n(attr='{node.attr}')"
        elif isinstance(node, ast.FunctionDef):
            node_label = f"FunctionDef\n(name='{node.name}')"

        self.graph.node(node_id, label=node_label)

        if parent_id:
            self.graph.edge(parent_id, node_id)

        for field, value in ast.iter_fields(node):
            if isinstance(value, list):
                for item in value:
                    if isinstance(item, ast.AST):
                        self.traverse(item, node_id)
            elif isinstance(value, ast.AST):
                self.traverse(value, node_id)

    def get_graph(self, code):
        try:
            tree = ast.parse(code)
            self.traverse(tree)
            return self.graph
        except SyntaxError as e:
            st.error(f"Python Syntax Error: {e}")
            return None
        except Exception as e:
            st.error(f"Error building AST: {e}")
            return None

# --- 6. STREAMLIT APP UI  ---

def initialize_state():
    if "logged_in" not in st.session_state:
        st.session_state.logged_in = False
    if "username" not in st.session_state:
        st.session_state.username = None
    if "role" not in st.session_state:
        st.session_state.role = None
    if "page" not in st.session_state:
        st.session_state.page = "Login"
    if "chat_history" not in st.session_state:
        st.session_state.chat_history = []
    if "last_output" not in st.session_state:
        st.session_state.last_output = ""
    if "token" not in st.session_state:
        st.session_state.token = None

def show_login_page(is_first_user):
    st.title("CodeGenie 🧞")
    col1, col2 = st.columns([2, 3])
    with col1:
        st.subheader("Welcome Back")
    with col2:
        with st.container(border=True):
            tab1, tab2 = st.tabs(["Login", "Sign Up"])
            with tab1:
                with st.form("login_form"):
                    username = st.text_input("Username", key="login_username")
                    password = st.text_input("Password", type="password", key="login_password")
                    login_button = st.form_submit_button("Login", use_container_width=True)
                    if login_button:
                        user_data = verify_user(username, password)
                        if user_data:
                            st.session_state.logged_in = True
                            st.session_state.username = user_data["username"]
                            st.session_state.role = user_data["role"]
                            st.session_state.token = create_jwt(user_data["username"], user_data["role"])
                            st.session_state.page = "Generator"
                            st.rerun()
                        else:
                            st.error("Invalid username or password")
                if st.button("Forgot Password?", type="primary"):
                    st.session_state.page = "Forgot Password"
                    st.rerun()
            with tab2:
                # FIX: Removed the extraneous backslash character
                with st.form("signup_form"):
                    st.markdown("### Create Your Account")
                    if is_first_user:
                        st.info("✨ You will be the first user! You will be registered as an **Admin**.")
                    new_username = st.text_input("Username*")
                    new_password = st.text_input("Password*", type="password")
                    new_password_confirm = st.text_input("Confirm Password*", type="password")
                    st.markdown("---")
                    st.markdown("**Security Question (for password reset)**")
                    question = st.selectbox("Select a question*", ["What was the name of your first pet?", "What is your mother's maiden name?", "What city were you born in?", "What was your first car?"])
                    answer = st.text_input("Your Answer*", type="password")
                    signup_button = st.form_submit_button("Sign Up", use_container_width=True)
                    if signup_button:
                        if not (new_username and new_password and new_password_confirm and answer):
                            st.error("Please fill out all required fields.")
                        elif new_password != new_password_confirm:
                            st.error("Passwords do not match.")
                        else:
                            role = 'admin' if is_first_user else 'user'
                            success = add_user(new_username, new_password, question, answer, role)
                            if success:
                                st.success(f"User '{new_username}' created successfully as {'an Admin' if role == 'admin' else 'a User'}. Please log in.")
                            else:
                                st.error("Username already exists.")

def show_forgot_password_page():
    st.title("Reset Password")
    with st.container(border=True):
        if "reset_user" not in st.session_state:
            st.session_state.reset_user = None
            st.session_state.reset_step = 1
        if st.session_state.reset_step == 1:
            with st.form("step1_username"):
                username = st.text_input("Enter your username")
                submit_user = st.form_submit_button("Next")
                if submit_user:
                    user_details = get_user_details(username)
                    if user_details:
                        st.session_state.reset_user = user_details
                        st.session_state.reset_step = 2
                        st.rerun()
                    else:
                        st.error("Username not found.")
        if st.session_state.reset_step == 2:
            st.info(f"**Security Question:** {st.session_state.reset_user['question']}")
            with st.form("step2_answer"):
                answer = st.text_input("Your Answer", type="password")
                submit_answer = st.form_submit_button("Verify Answer")
                if submit_answer:
                    if check_hash(answer, st.session_state.reset_user['answer_hash']):
                        st.session_state.reset_step = 3
                        st.rerun()
                    else:
                        st.error("Incorrect answer.")
        if st.session_state.reset_step == 3:
            st.success("Verification successful!")
            with st.form("step3_new_password"):
                new_password = st.text_input("New Password", type="password")
                confirm_password = st.text_input("Confirm New Password", type="password")
                submit_pass = st.form_submit_button("Reset Password")
                if submit_pass:
                    if not new_password or new_password != confirm_password:
                        st.error("Passwords do not match or are empty.")
                    else:
                        update_user_password(st.session_state.reset_user['username'], new_password)
                        st.success("Password reset successfully! You can now log in.")
                        st.session_state.reset_step = 1
                        st.session_state.reset_user = None
                        st.session_state.page = "Login"
                        st.rerun()
    if st.button("← Back to Login"):
        st.session_state.page = "Login"
        st.session_state.reset_step = 1
        st.session_state.reset_user = None
        st.rerun()

def show_generator_page():
    st.title("🤖 Code Generator")

    # Check if a model is currently loaded, and display status if necessary
    model_keys = list(MODELS.keys())
    # This initial call forces the caching/loading to start immediately
    if model_keys and load_model(MODELS[model_keys[0]])[0] is None:
        st.error("Please wait for the model to load or check the error message above.")
        return

    selected_model = st.selectbox("Select AI Model", model_keys)
    chat_container = st.container(height=400, border=True)
    with chat_container:
        for entry in st.session_state.chat_history:
            with st.chat_message(entry["role"]):
                st.markdown(entry["content"])
    prompt = st.text_area("Your prompt:", key="prompt_input", height=100)
    col1, col2, col3 = st.columns([2, 1, 1])
    with col1:
        if st.button("Generate Code 🚀", use_container_width=True, type="primary"):
            if prompt:
                st.session_state.chat_history.append({"role": "user", "content": prompt})
                with chat_container:
                    with st.chat_message("user"):
                        st.markdown(prompt)
                    with st.chat_message("assistant"):
                        with st.spinner(f"CodeGenie is thinking with {selected_model}..."):
                            try:
                                response = generate_code(prompt, MODELS[selected_model])
                            except Exception as e:
                                response = f"Error: {str(e)}"
                            st.markdown(response)
                st.session_state.chat_history.append({"role": "assistant", "content": response})
                if not response.startswith("Error:"):
                    st.session_state.last_output = response
                st.rerun()
            else:
                st.warning("Please enter a prompt.")
    with col2:
        if st.button("📋 Copy Last Output", use_container_width=True):
            if st.session_state.last_output:
                st.session_state.prompt_input = st.session_state.last_output
                st.rerun()
            else:
                st.toast("No output to copy yet.")
    with col3:
        if st.button("🗑️ Clear History", use_container_width=True):
            st.session_state.chat_history = []
            st.session_state.last_output = ""
            st.rerun()

def show_explainer_page():
    st.title("🔬 Code Explainer & AST Visualizer")
    lang = st.selectbox("Select Language", ["Python", "JavaScript", "SQL"])
    code = st.text_area("Paste your code here:", height=250)
    if st.button("Explain & Visualize", type="primary"):
        if not code:
            st.warning("Please paste some code to explain.")
            return
        col1, col2 = st.columns(2)
        with col1:
            st.subheader("AI Explanation")
            explainer_model_name = "Gemma-2B-IT"
            with st.spinner(f"Generating explanation with {explainer_model_name}..."):
                explain_prompt = f"Explain the following {lang} code. Describe what it does, step by step. Provide the explanation in markdown format, using code blocks for the code.\n\n```\n{code}\n```"
                # Use the new local model inference
                explanation = query_hf_model(explain_prompt, explainer_model_name)
                st.markdown(explanation)
        with col2:
            st.subheader("Abstract Syntax Tree (AST)")
            if lang == "Python":
                visualizer = ASTVisualizer()
                graph = visualizer.get_graph(code)
                if graph:
                    st.success("Python AST Visualization:")
                    st.graphviz_chart(graph)
            else:
                st.info(f"AST visualization is only available for Python. The AI explanation should describe the {lang} code structure.")

def show_profile_page():
    st.title(f"Profile: {st.session_state.username}")
    st.info(f"**Username:** `{st.session_state.username}`\n\n**Role:** `{st.session_state.role}`")
    st.subheader("Change Password")
    with st.form("change_password_form"):
        current_password = st.text_input("Current Password", type="password")
        new_password = st.text_input("New Password", type="password")
        confirm_new_password = st.text_input("Confirm New Password", type="password")
        submit_change = st.form_submit_button("Update Password")
        if submit_change:
            user_data = verify_user(st.session_state.username, current_password)
            if not user_data:
                st.error("Incorrect current password.")
            elif not new_password or new_password != confirm_new_password:
                st.error("New passwords do not match or are empty.")
            else:
                update_user_password(st.session_state.username, new_password)
                st.success("Password updated successfully!")

def show_admin_panel():
    if st.session_state.role != 'admin':
        st.error("You do not have permission to view this page.")
        return
    st.title("👑 Admin Panel")
    st.subheader("All Registered Users")
    users = get_all_users()
    if users:
        df_data = [{"ID": user[0], "Username": user[1], "Role": user[2]} for user in users]
        st.dataframe(df_data, use_container_width=True, hide_index=True)
    else:
        st.info("No users found (except you).")

# --- 7. MAIN APP ROUTER  ---

def main():
    st.set_page_config(page_title="CodeGenie", page_icon="🧞", layout="wide")
    is_first_user = init_db()
    initialize_state()

    if st.session_state.token:
        payload = verify_jwt(st.session_state.token)
        if payload:
            st.session_state.logged_in = True
            st.session_state.username = payload['sub']
            st.session_state.role = payload['role']
        else:
            st.session_state.logged_in = False
            st.session_state.token = None
            st.session_state.page = "Login"

    if not st.session_state.logged_in:
        if st.session_state.page == "Forgot Password":
            show_forgot_password_page()
        else:
            show_login_page(is_first_user)
    else:
        with st.sidebar:

            st.title(f"CodeGenie 🧞")
            st.markdown(f"Welcome, **{st.session_state.username}**!")
            st.markdown("---")

            def set_page(page_name):
                st.session_state.page = page_name
                st.rerun()

            st.button("Code Generator", use_container_width=True, type="primary" if st.session_state.page == "Generator" else "secondary", on_click=set_page, args=("Generator",))
            st.button("Code Explainer", use_container_width=True, type="primary" if st.session_state.page == "Explainer" else "secondary", on_click=set_page, args=("Explainer",))
            st.button("My Profile", use_container_width=True, type="primary" if st.session_state.page == "Profile" else "secondary", on_click=set_page, args=("Profile",))

            if st.session_state.role == 'admin':
                st.markdown("---")
                st.markdown("*Admin Tools*")
                st.button("Admin Panel", use_container_width=True, type="primary" if st.session_state.page == "Admin" else "secondary", on_click=set_page, args=("Admin",))

            st.markdown("---")
            if st.button("Logout 🔒", use_container_width=True):
                for key in list(st.session_state.keys()):
                    del st.session_state[key]
                st.rerun()

        if st.session_state.page == "Generator":
            show_generator_page()
        elif st.session_state.page == "Explainer":
            show_explainer_page()
        elif st.session_state.page == "Profile":
            show_profile_page()
        elif st.session_state.page == "Admin":
            show_admin_panel()
        else:
            st.session_state.page = "Generator"
            st.rerun()

if __name__ == "__main__":
    main()

In [None]:
import os
from pyngrok import ngrok, conf

# --- Cleanup and Setup ---
# Terminate any existing Streamlit process to ensure a clean start
os.system("pkill -f streamlit")
ngrok.kill()

# Get NGROK_TOKEN from environment (set in Step 1)
NGROK_TOKEN = os.environ.get('NGROK_TOKEN')

# Set ngrok auth token
if NGROK_TOKEN:
    ngrok.set_auth_token(NGROK_TOKEN)
else:
    print("NGROK_TOKEN not found. Deployment will fail.")

# Start Streamlit in the background
os.system("streamlit run app.py &")

# Connect to Streamlit's default port 8501
try:
    public_url = ngrok.connect(8501)

    print("----------------------------------------")
    print("Your CodeGenie app is live at:")
    print(public_url)
    print("----------------------------------------")
except Exception as e:
    print(f"Error starting ngrok: {e}")
    print("Please ensure your NGROK_TOKEN is correct and not in use.")