In [14]:
%%writefile requirements.txt
streamlit
streamlit-antd-components
PyJWT
pyngrok
PyPDF2
textstat
plotly

Overwriting requirements.txt


In [15]:
!pip install -r requirements.txt -q

In [16]:
%%writefile dashboard.py
import streamlit as st
import streamlit_antd_components as sac
import PyPDF2
import textstat
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import re

def get_human_grade_level(score):
    if score >= 90: return "5th grade"
    elif score >= 80: return "6th grade"
    elif score >= 70: return "7th grade"
    elif score >= 60: return "8th and 9th grade"
    elif score >= 50: return "10th to 12th grade (High School)"
    elif score >= 30: return "College"
    else: return "College graduate"

def render_dashboard(username):
    st.title("üìÑ Readability Dashboard")
    st.write("Upload a document or paste text to evaluate its readability and text complexity.")

    input_mode = sac.segmented(
        items=[
            sac.SegmentedItem(label='Upload PDF', icon='file-earmark-pdf'),
            sac.SegmentedItem(label='Paste Text', icon='fonts'),
        ],
        align='center', use_container_width=True, color='primary'
    )

    raw_text = ""

    with st.container(border=True):
        if input_mode == 'Upload PDF':
            uploaded_file = st.file_uploader("Drop your PDF document here", type=["pdf"])
            if uploaded_file is not None:
                with st.spinner("Extracting text and running linguistic analysis..."):
                    try:
                        reader = PyPDF2.PdfReader(uploaded_file)
                        num_pages = min(len(reader.pages), 20)
                        for i in range(num_pages):
                            page = reader.pages[i]
                            text = page.extract_text()
                            if text:
                                raw_text += text + " "
                    except Exception as e:
                        st.error(f"Error reading PDF: {e}")
                        return
        else:
            text_input = st.text_area("Paste your text here", height=250, placeholder="Enter text to analyze...")
            if st.button("Analyze Text", type="primary"):
                raw_text = text_input

    if not raw_text.strip():
        if input_mode == 'Upload PDF' and 'uploaded_file' in locals() and uploaded_file:
            st.warning("No readable text found. Is this a scanned document or an image-based PDF?")
        return

    # Process Text
    extracted_text = raw_text.replace('\n', ' ')
    extracted_text = re.sub(r'\s+', ' ', extracted_text).strip()

    word_count = textstat.lexicon_count(extracted_text)
    sentence_count = textstat.sentence_count(extracted_text)

    if word_count == 0:
        st.warning("Not enough words to analyze.")
        return

    reading_time_mins = max(1, int(word_count // 250))
    flesch_ease = textstat.flesch_reading_ease(extracted_text)
    clamped_flesch = max(0, min(100, flesch_ease))
    human_grade = get_human_grade_level(clamped_flesch)

    fk_grade = textstat.flesch_kincaid_grade(extracted_text)
    gunning_fog = textstat.gunning_fog(extracted_text)
    smog = textstat.smog_index(extracted_text)

    sac.divider(label='Document Overview', align='center', color='gray')

    col1, col2, col3, col4 = st.columns(4)
    with col1:
        with st.container(border=True):
            st.metric("Total Words", f"{word_count:,}")
    with col2:
        with st.container(border=True):
            st.metric("Sentences", f"{sentence_count:,}")
    with col3:
        with st.container(border=True):
            st.metric("Est. Read Time", f"{reading_time_mins} min")
    with col4:
        with st.container(border=True):
            st.metric("Reading Level", human_grade)

    sac.divider(label='Deep Dive Analysis', align='center', color='gray')

    chart_col1, chart_col2 = st.columns(2)

    with chart_col1:
        with st.container(border=True):
            fig_gauge = go.Figure(go.Indicator(
                mode = "gauge+number",
                value = clamped_flesch,
                domain = {'x': [0, 1], 'y': [0, 1]},
                title = {'text': "Flesch Reading Ease<br><span style='font-size:0.8em;color:gray'>Higher is easier (0-100)</span>"},
                gauge = {
                    'axis': {'range': [0, 100]},
                    'bar': {'color': "darkgray"},
                    'steps': [
                        {'range': [0, 30], 'color': "lightcoral"},
                        {'range': [30, 60], 'color': "gold"},
                        {'range': [60, 100], 'color': "lightgreen"}
                    ]
                }
            ))
            fig_gauge.update_layout(paper_bgcolor="rgba(0,0,0,0)", font={'color': "gray"}, height=350, margin=dict(l=20, r=20, t=50, b=20))
            st.plotly_chart(fig_gauge, use_container_width=True)

    with chart_col2:
        with st.container(border=True):
            data = {
                "Algorithm": ["Flesch-Kincaid", "Gunning Fog", "SMOG Index"],
                "Grade Level": [fk_grade, gunning_fog, smog]
            }
            df = pd.DataFrame(data)
            fig_bar = px.bar(
                df, x="Algorithm", y="Grade Level", color="Algorithm",
                title="Grade Level Equivalents", text_auto='.1f'
            )
            fig_bar.update_layout(paper_bgcolor="rgba(0,0,0,0)", font={'color': "gray"}, height=350, showlegend=False, margin=dict(l=20, r=20, t=50, b=20))
            st.plotly_chart(fig_bar, use_container_width=True)

            if sentence_count < 30:
                st.caption("‚ö†Ô∏è **Note:** The SMOG Index requires at least 30 sentences for mathematical validity.")

    sac.divider(label='Source Material', align='center', color='gray')
    with st.expander("View Extracted Text"):
        st.write(extracted_text)

Overwriting dashboard.py


In [22]:
%%writefile app.py
import streamlit as st
import streamlit_antd_components as sac
import sqlite3
import hashlib
import jwt
import datetime
import re
import time
import os
import smtplib
import secrets
import pandas as pd
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dashboard import render_dashboard

# --- CONFIGURATION ---
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "fallback_dev_key_123")
DB_NAME = "app_data.db"

SECURITY_QUESTIONS = [
    "What is your pet's name?",
    "What city were you born in?",
    "What is your mother's maiden name?",
    "What was the name of your first school?",
    "What was the make of your first car?"
]

os.makedirs(".streamlit", exist_ok=True)
with open(".streamlit/config.toml", "w") as f:
    f.write("""
[theme]
primaryColor="#00b4d8"
backgroundColor="#081021"
secondaryBackgroundColor="#141f38"
textColor="#caf0f8"
""")

# --- 1. DATABASE MANAGEMENT ---
def init_db():
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        c.execute('''
            CREATE TABLE IF NOT EXISTS users (
                email TEXT PRIMARY KEY,
                username TEXT,
                password_hash TEXT,
                security_question TEXT,
                security_answer TEXT,
                failed_attempts INTEGER DEFAULT 0,
                lockout_until TEXT,
                otp_code TEXT,
                otp_expiry TEXT,
                role TEXT DEFAULT 'user',
                last_login TEXT
            )
        ''')
        try:
            c.execute("ALTER TABLE users ADD COLUMN role TEXT DEFAULT 'user'")
            c.execute("ALTER TABLE users ADD COLUMN last_login TEXT")
        except sqlite3.OperationalError:
            pass

        c.execute('''
            CREATE TABLE IF NOT EXISTS password_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                email TEXT,
                password_hash TEXT,
                changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        conn.commit()

def hash_data(data):
    return hashlib.sha256(data.encode()).hexdigest()

def add_user(username, email, password, question, answer):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        safe_email = email.strip().lower()
        p_hash = hash_data(password)
        a_hash = hash_data(answer.strip().lower())

        super_admin = os.getenv("SUPER_ADMIN_EMAIL", "")
        role = 'admin' if safe_email == super_admin.lower() else 'user'

        try:
            c.execute('INSERT INTO users (email, username, password_hash, security_question, security_answer, failed_attempts, role) VALUES (?,?,?,?,?,?,?)',
                      (safe_email, username, p_hash, question, a_hash, 0, role))
            c.execute('INSERT INTO password_history (email, password_hash) VALUES (?,?)',
                      (safe_email, p_hash))
            conn.commit()
            return True
        except sqlite3.IntegrityError:
            return False

def authenticate_user(email, password):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        safe_email = email.strip().lower()
        p_hash = hash_data(password)

        c.execute('SELECT username, password_hash, failed_attempts, lockout_until, role FROM users WHERE email=?', (safe_email,))
        row = c.fetchone()

        if not row:
            return None, "Invalid Email or Password."

        username, true_hash, failed_attempts, lockout_until, role = row
        failed_attempts = failed_attempts or 0

        if lockout_until:
            lockout_time = datetime.datetime.fromisoformat(lockout_until)
            if datetime.datetime.now() < lockout_time:
                remaining = int((lockout_time - datetime.datetime.now()).total_seconds() // 60) + 1
                return None, f"Account locked. Try again in {remaining} minute(s)."
            else:
                failed_attempts = 0
                c.execute('UPDATE users SET failed_attempts=0, lockout_until=NULL WHERE email=?', (safe_email,))
                conn.commit()

        if p_hash == true_hash:
            now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            c.execute('UPDATE users SET failed_attempts=0, lockout_until=NULL, last_login=? WHERE email=?', (now_str, safe_email))
            conn.commit()
            return {"username": username, "email": safe_email, "role": role}, "Success"
        else:
            failed_attempts += 1
            if failed_attempts >= 5:
                lock_time = (datetime.datetime.now() + datetime.timedelta(minutes=5)).isoformat()
                c.execute('UPDATE users SET failed_attempts=?, lockout_until=? WHERE email=?', (failed_attempts, lock_time, safe_email))
                conn.commit()
                return None, "Account locked for 5 minutes due to too many failed attempts."
            else:
                c.execute('UPDATE users SET failed_attempts=? WHERE email=?', (failed_attempts, safe_email))
                conn.commit()
                return None, f"Invalid Email or Password. {5 - failed_attempts} attempt(s) remaining."

# --- 2. ADMIN DATABASE FUNCTIONS ---
def get_all_users():
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        query = "SELECT username, email, role, last_login FROM users"
        return pd.read_sql_query(query, conn)

def delete_user(email):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        c.execute("DELETE FROM users WHERE email=?", (email,))
        c.execute("DELETE FROM password_history WHERE email=?", (email,))
        conn.commit()

def promote_user_to_admin(email):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        c.execute("UPDATE users SET role='admin' WHERE email=?", (email,))
        conn.commit()

# --- 3. OTP, EMAIL & RECOVERY FUNCTIONS ---
def send_otp_email(receiver_email, otp_code):
    try:
        sender_email = os.getenv('GMAIL_USER')
        sender_password = os.getenv('GMAIL_APP_PASS')

        msg = MIMEMultipart()
        msg['From'] = f"Infosys LLM <{sender_email}>"
        msg['To'] = receiver_email
        msg['Subject'] = "üîê Infosys LLM - Security OTP"

        body = f"""
        <!DOCTYPE html><html><head><style>
        .container {{ font-family: 'Courier New', monospace; background-color: #0e1117; padding: 40px; text-align: center; color: #ffffff; }}
        .card {{ background-color: #1f2937; border-radius: 12px; box-shadow: 0 0 20px rgba(0, 255, 204, 0.2); padding: 40px; max-width: 500px; margin: 0 auto; border: 1px solid #374151; }}
        .header {{ color: #00ffcc; font-size: 24px; font-weight: 600; margin-bottom: 20px; text-shadow: 0 0 5px #00ffcc; }}
        .otp-box {{ background-color: #0e1117; color: #00ffcc; font-size: 32px; font-weight: 700; letter-spacing: 8px; padding: 20px; border-radius: 8px; margin: 30px 0; display: inline-block; border: 1px solid #00ffcc; box-shadow: 0 0 10px rgba(0, 255, 204, 0.3); }}
        .text {{ color: #9ca3af; font-size: 16px; line-height: 1.5; margin-bottom: 20px; }}
        .footer {{ color: #6b7280; font-size: 12px; margin-top: 30px; }}
        </style></head><body><div class="container"><div class="card">
        <div class="header">‚ö° Infosys LLM Security</div>
        <div class="text">Use this OTP to verify your action for account: <span style="color:#00ffcc;">{receiver_email}</span>.</div>
        <div class="otp-box">{otp_code}</div>
        <div class="text">Valid for <strong>5 minutes</strong>.</div>
        <div class="footer">&copy; 2026 Infosys LLM Secure Auth</div>
        </div></div></body></html>
        """

        msg.attach(MIMEText(body, 'html'))
        server = smtplib.SMTP('smtp.gmail.com', 587)
        server.starttls()

        if not sender_password:
            return False

        server.login(sender_email, sender_password)
        server.sendmail(sender_email, receiver_email, msg.as_string())
        server.quit()
        return True
    except Exception as e:
        return False

def generate_and_store_otp(email):
    otp = str(secrets.randbelow(1000000)).zfill(6)
    expiry = (datetime.datetime.now() + datetime.timedelta(minutes=5)).isoformat()
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        c.execute("UPDATE users SET otp_code=?, otp_expiry=? WHERE email=?", (otp, expiry, email.strip().lower()))
        conn.commit()
    return otp

def verify_otp(email, user_otp):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        c.execute("SELECT otp_code, otp_expiry FROM users WHERE email=?", (email.strip().lower(),))
        row = c.fetchone()

        if not row or not row[0] or not row[1]:
            return False, "Invalid or expired OTP."

        db_otp, expiry_str = row
        expiry = datetime.datetime.fromisoformat(expiry_str)

        if datetime.datetime.now() > expiry:
            return False, "OTP has expired. Please log in again."

        if db_otp == user_otp.strip():
            c.execute("UPDATE users SET otp_code=NULL, otp_expiry=NULL WHERE email=?", (email.strip().lower(),))
            conn.commit()
            return True, "Success"

        return False, "Incorrect OTP."

def get_security_question(email):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        safe_email = email.strip().lower()
        c.execute('SELECT security_question FROM users WHERE email=?', (safe_email,))
        result = c.fetchone()
        return result[0] if result else None

def verify_security_answer(email, answer):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        safe_email = email.strip().lower()
        a_hash = hash_data(answer.strip().lower())
        c.execute('SELECT security_answer FROM users WHERE email=?', (safe_email,))
        result = c.fetchone()
        return result and result[0] == a_hash

def check_password_history(email, new_password):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        safe_email = email.strip().lower()
        new_hash = hash_data(new_password)
        c.execute('SELECT password_hash FROM password_history WHERE email=? ORDER BY id DESC LIMIT 3', (safe_email,))
        history = [row[0] for row in c.fetchall()]
        if new_hash in history:
            return False
        return True

def update_password(email, new_password):
    with sqlite3.connect(DB_NAME, timeout=10) as conn:
        c = conn.cursor()
        safe_email = email.strip().lower()
        new_hash = hash_data(new_password)
        c.execute('UPDATE users SET password_hash=? WHERE email=?', (new_hash, safe_email))
        c.execute('INSERT INTO password_history (email, password_hash) VALUES (?,?)', (safe_email, new_hash))
        c.execute('''
            DELETE FROM password_history
            WHERE email = ?
            AND id NOT IN (
                SELECT id FROM password_history
                WHERE email = ?
                ORDER BY id DESC
                LIMIT 3
            )
        ''', (safe_email, safe_email))
        conn.commit()

# --- 4. VALIDATION FUNCTIONS ---
def create_jwt_token(user_data):
    payload = {
        "sub": user_data['username'],
        "email": user_data['email'],
        "role": user_data['role'],
        "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def verify_jwt_token(token):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except:
        return None

def validate_username(username):
    if not username:
        return False, "Username cannot be empty."
    if " " in username:
        return False, "Username cannot contain spaces."
    if len(username) < 3 or len(username) > 25:
        return False, "Username must be between 3 and 25 characters."
    if not username.isalnum():
        return False, "Username must contain only letters and numbers (no special characters)."
    return True, ""

def validate_email(email):
    if not email:
        return False, "Email cannot be empty."
    if " " in email:
        return False, "Email cannot contain spaces."
    if len(email) > 254:
        return False, "Email is too long (maximum 254 characters)."
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    if not re.match(pattern, email):
        return False, "Invalid email format (e.g., user@domain.com)."
    return True, ""

def validate_password(password):
    if not password:
        return False, "Password cannot be empty."
    if " " in password:
        return False, "Password cannot contain spaces."
    if len(password) < 8 or len(password) > 128:
        return False, "Password must be between 8 and 128 characters."
    if not re.search(r"[A-Z]", password):
        return False, "Password must contain at least one uppercase letter."
    if not re.search(r"\d", password):
        return False, "Password must contain at least one number."
    return True, ""

def validate_security_answer(answer):
    if not answer:
        return False, "Security answer cannot be empty."
    if len(answer) < 3:
        return False, "Security answer must be at least 3 characters long."
    return True, ""

# --- 5. UI COMPONENTS & SESSION STATE ---
st.set_page_config(page_title="Secure Auth & Dashboard", page_icon="üîí", layout="wide")
init_db()

st.markdown("""
<style>
button[kind="secondary"] {
    transition: 0.3s;
}
</style>
""", unsafe_allow_html=True)

if 'jwt_token' not in st.session_state: st.session_state.jwt_token = None
if 'login_step' not in st.session_state: st.session_state.login_step = 1
if 'forgot_step' not in st.session_state: st.session_state.forgot_step = 1
if 'admin_action' not in st.session_state: st.session_state.admin_action = None
if 'pending_email' not in st.session_state: st.session_state.pending_email = None
if 'pending_user' not in st.session_state: st.session_state.pending_user = None
if 'confirm_delete_email' not in st.session_state: st.session_state.confirm_delete_email = None

# --- 6. MAIN APP LOGIC ---
user_session = verify_jwt_token(st.session_state.jwt_token) if st.session_state.jwt_token else None

if user_session:
    with st.sidebar:
        st.write(f"### üë§ {user_session['sub']}")
        st.caption(f"Role: **{user_session['role'].upper()}**")
        st.divider()
        if st.button("Logout", type="primary", use_container_width=True):
            st.session_state.jwt_token = None
            st.rerun()

    if user_session['role'] == 'admin':
        st.title("üõ°Ô∏è System Administration")
        st.write("Manage users, monitor logins, and control system access.")

        df_users = get_all_users()

        sac.divider(label='User Management', align='center', color='gray')
        col_del, col_prom = st.columns(2)

        with col_del:
            with st.container(border=True):
                st.write("### üóëÔ∏è Remove User")
                safe_to_delete = df_users[df_users['email'] != user_session['email']]

                if st.session_state.confirm_delete_email:
                    target = st.session_state.confirm_delete_email
                    st.warning(f"Are you sure you want to permanently delete **{target}**?")
                    c1, c2 = st.columns(2)
                    with c1:
                        if st.button("üö® Confirm Delete", use_container_width=True):
                            delete_user(target)
                            st.success(f"User {target} removed.")
                            st.session_state.confirm_delete_email = None
                            time.sleep(1)
                            st.rerun()
                    with c2:
                        if st.button("Cancel", use_container_width=True):
                            st.session_state.confirm_delete_email = None
                            st.rerun()
                else:
                    del_target = st.selectbox("Select User to Remove", safe_to_delete['email'].tolist(), key="del_box")
                    if st.button("Delete Selected User", type="primary"):
                        if del_target:
                            st.session_state.confirm_delete_email = del_target
                            st.rerun()

        with col_prom:
            with st.container(border=True):
                st.write("### üîë Promote to Admin")

                if st.session_state.admin_action != 'promoting':
                    eligible_users = df_users[df_users['role'] == 'user']
                    prom_target = st.selectbox("Select User to Promote", eligible_users['email'].tolist(), key="prom_box")

                    if st.button("Initiate Promotion"):
                        if prom_target:
                            st.session_state.target_promote_email = prom_target
                            otp = generate_and_store_otp(user_session['email'])
                            if send_otp_email(user_session['email'], otp):
                                st.session_state.admin_action = 'promoting'
                                st.rerun()
                            else:
                                st.error("Failed to send verification email.")
                else:
                    st.info(f"Verify your identity to promote **{st.session_state.target_promote_email}**.")
                    admin_otp = st.text_input("Enter OTP sent to your email", max_chars=6)

                    if st.button("Verify & Promote", type="primary"):
                        is_valid, _ = verify_otp(user_session['email'], admin_otp)
                        if is_valid:
                            promote_user_to_admin(st.session_state.target_promote_email)
                            st.success("User promoted successfully!")
                            st.session_state.admin_action = None
                            time.sleep(1.5)
                            st.rerun()
                        else:
                            st.error("Invalid OTP.")
                    if st.button("Cancel Promotion"):
                        st.session_state.admin_action = None
                        st.rerun()

        sac.divider(label='User Directory', align='center', color='gray')
        st.dataframe(df_users, use_container_width=True, hide_index=True)

    else:
        render_dashboard(user_session['sub'])

else:
    col1, col2, col3 = st.columns([1, 2, 1])
    with col2:
        if st.session_state.login_step == 1 and st.session_state.forgot_step == 1:
            auth_mode = sac.segmented(
                items=[
                    sac.SegmentedItem(label='Login', icon='box-arrow-in-right'),
                    sac.SegmentedItem(label='Sign Up', icon='person-plus-fill'),
                    sac.SegmentedItem(label='Forgot Password', icon='key'),
                ],
                align='center',
                use_container_width=True,
                color='primary'
            )
        else:
            if st.session_state.login_step > 1:
                auth_mode = 'Login'
            else:
                auth_mode = 'Forgot Password'

        with st.container(border=True):
            if auth_mode == 'Login':
                if st.session_state.login_step == 1:
                    st.subheader("Sign In")
                    email = st.text_input("Email Address", key="login_email")
                    password = st.text_input("Password", type="password", key="login_pass")

                    if st.button("Log In", type="primary", use_container_width=True):
                        clean_email = email.strip()
                        is_valid_email, email_err = validate_email(clean_email)

                        if not is_valid_email:
                            st.error(email_err)
                        elif not password:
                            st.error("Password cannot be empty.")
                        else:
                            user_data, message = authenticate_user(clean_email, password)
                            if user_data:
                                with st.spinner("Sending secure code to your email..."):
                                    otp = generate_and_store_otp(clean_email)
                                    if send_otp_email(clean_email, otp):
                                        st.session_state.pending_email = clean_email
                                        st.session_state.pending_user = user_data
                                        st.session_state.login_step = 2
                                        st.rerun()
                                    else:
                                        st.error("Failed to send email. Please check server settings.")
                            else:
                                st.error(message)

                elif st.session_state.login_step == 2:
                    st.subheader("Two-Factor Authentication")
                    st.info(f"An email with a 6-digit code has been sent to **{st.session_state.pending_email}**.")
                    entered_otp = st.text_input("Enter 6-digit Code", max_chars=6)

                    if st.button("Verify & Log In", type="primary", use_container_width=True):
                        if not entered_otp.strip():
                            st.error("Please enter the code.")
                        else:
                            is_valid, msg = verify_otp(st.session_state.pending_email, entered_otp)
                            if is_valid:
                                st.session_state.jwt_token = create_jwt_token(st.session_state.pending_user)
                                st.session_state.login_step = 1
                                st.session_state.pending_email = None
                                st.session_state.pending_user = None
                                st.rerun()
                            else:
                                st.error(msg)

                    if st.button("Cancel & Go Back"):
                        st.session_state.login_step = 1
                        st.session_state.pending_email = None
                        st.session_state.pending_user = None
                        st.rerun()

            elif auth_mode == 'Sign Up':
                st.subheader("Create Account")
                new_user = st.text_input("Username")
                new_email = st.text_input("Email Address")
                st.caption("Password must be 8-128 chars, contain 1 Upper Case, 1 Number, and NO spaces.")
                pass1 = st.text_input("Password", type="password")
                pass2 = st.text_input("Confirm Password", type="password")
                sec_q = st.selectbox("Select Security Question", SECURITY_QUESTIONS)
                sec_a = st.text_input("Security Answer")

                if st.button("Sign Up", type="primary", use_container_width=True):
                    clean_user = new_user.strip()
                    clean_email = new_email.strip()
                    clean_ans = sec_a.strip()

                    user_ok, user_err = validate_username(clean_user)
                    email_ok, email_err = validate_email(clean_email)
                    pass_ok, pass_err = validate_password(pass1)
                    ans_ok, ans_err = validate_security_answer(clean_ans)

                    if not user_ok:
                        st.error(user_err)
                    elif not email_ok:
                        st.error(email_err)
                    elif not pass_ok:
                        st.error(pass_err)
                    elif pass1 != pass2:
                        st.error("Passwords do not match.")
                    elif not ans_ok:
                        st.error(ans_err)
                    else:
                        success = add_user(clean_user, clean_email, pass1, sec_q, clean_ans)
                        if success:
                            st.success("Account Created! Please switch to Login tab.")
                        else:
                            st.error("Registration failed. Email might already exist.")

            elif auth_mode == 'Forgot Password':
                if st.session_state.forgot_step == 1:
                    st.subheader("Reset Password")
                    f_email = st.text_input("Enter your registered Email")

                    if st.button("Find Account"):
                        clean_email = f_email.strip()
                        email_ok, email_err = validate_email(clean_email)

                        if not email_ok:
                            st.error(email_err)
                        else:
                            question = get_security_question(clean_email)
                            if question:
                                st.session_state.pending_email = clean_email
                                st.session_state.security_q_display = question
                                st.session_state.forgot_step = 2
                                st.rerun()
                            else:
                                st.error("Email not found.")

                elif st.session_state.forgot_step == 2:
                    st.subheader("Security Question")
                    st.info(f"Question: {st.session_state.security_q_display}")
                    ans = st.text_input("Your Answer")

                    if st.button("Verify Answer", type="primary", use_container_width=True):
                        clean_ans = ans.strip()
                        ans_ok, ans_err = validate_security_answer(clean_ans)

                        if not ans_ok:
                            st.error(ans_err)
                        elif verify_security_answer(st.session_state.pending_email, clean_ans):
                            with st.spinner("Answer correct! Sending OTP to your email..."):
                                otp = generate_and_store_otp(st.session_state.pending_email)
                                if send_otp_email(st.session_state.pending_email, otp):
                                    st.session_state.forgot_step = 3
                                    st.rerun()
                                else:
                                    st.error("Failed to send OTP email.")
                        else:
                            st.error("Incorrect Answer.")

                    if st.button("Cancel"):
                        st.session_state.forgot_step = 1
                        st.session_state.pending_email = None
                        st.rerun()

                elif st.session_state.forgot_step == 3:
                    st.subheader("Email Verification")
                    st.info(f"An email with a 6-digit code has been sent to **{st.session_state.pending_email}**.")
                    entered_otp = st.text_input("Enter 6-digit Code", max_chars=6)

                    if st.button("Verify OTP", type="primary", use_container_width=True):
                        if not entered_otp.strip():
                            st.error("Please enter the code.")
                        else:
                            is_valid, msg = verify_otp(st.session_state.pending_email, entered_otp)
                            if is_valid:
                                st.session_state.forgot_step = 4
                                st.rerun()
                            else:
                                st.error(msg)

                    if st.button("Cancel"):
                        st.session_state.forgot_step = 1
                        st.session_state.pending_email = None
                        st.rerun()

                elif st.session_state.forgot_step == 4:
                    st.subheader("Set New Password")
                    st.success("Identity Verified!")
                    new_p1 = st.text_input("New Password", type="password")
                    new_p2 = st.text_input("Confirm New Password", type="password")

                    if st.button("Update Password", type="primary", use_container_width=True):
                        pass_ok, pass_err = validate_password(new_p1)

                        if not pass_ok:
                            st.error(pass_err)
                        elif new_p1 != new_p2:
                            st.error("Passwords do not match.")
                        else:
                            if check_password_history(st.session_state.pending_email, new_p1):
                                update_password(st.session_state.pending_email, new_p1)
                                st.success("Password Updated! Click below to go back to Login.")
                                st.session_state.forgot_step = 1
                                st.session_state.pending_email = None
                            else:
                                st.error("You cannot reuse any of your last 3 passwords.")

                    if st.button("Back to Login"):
                        st.session_state.forgot_step = 1
                        st.session_state.pending_email = None
                        st.rerun()

Overwriting app.py


In [24]:
from pyngrok import ngrok
import subprocess
import time
import os
from google.colab import userdata

# 0. CLEANUP
ngrok.kill()
os.system("pkill -f streamlit")
time.sleep(2)

# 1. AUTHENTICATE NGROK
NGROK_AUTH_TOKEN = userdata.get('NGROK_AUTH_TOKEN')
ngrok.set_auth_token(NGROK_AUTH_TOKEN)

# 2. INJECT SECRETS AS ENVIRONMENT VARIABLES (This is the crucial step!)
my_env = os.environ.copy()
my_env["GMAIL_USER"] = userdata.get('GMAIL_USER')
my_env["GMAIL_APP_PASS"] = userdata.get('GMAIL_APP_PASS')
my_env["SUPER_ADMIN_EMAIL"] = userdata.get('SUPER_ADMIN_EMAIL')

# 3. RUN STREAMLIT
print("Starting Streamlit...")
log_file = open('streamlit_log.txt', 'w')

# Pass the 'my_env' dictionary into the subprocess!
process = subprocess.Popen(
    ['streamlit', 'run', 'app.py', '--server.port', '8501'],
    stdout=log_file,
    stderr=subprocess.STDOUT,
    env=my_env
)

time.sleep(5)

print("\n--- üìù STREAMLIT STARTUP LOGS ---")
with open('streamlit_log.txt', 'r') as f:
    logs = f.read()
    print(logs if logs else "No logs generated yet. Process might be stuck.")
print("---------------------------------\n")

if process.poll() is not None:
    print("üö® ERROR: Streamlit crashed. Check the logs above for the reason.")
else:
    try:
        public_url = ngrok.connect(8501).public_url
        print(f"‚úÖ Your Streamlit app is live at: {public_url}")
    except Exception as e:
        print(f"üö® NGROK ERROR: {e}")

Starting Streamlit...


KeyboardInterrupt: 