In [None]:
!pip install streamlit pyjwt bcrypt python-dotenv pyngrok nltk streamlit-option-menu plotly textstat PyPDF2 -q

In [None]:
%%writefile db.py
import sqlite3
import bcrypt
import datetime
import time


DB_NAME = "users.db"

# ----------------- Database Init -----------------
def init_db():
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    # Users table
    c.execute('''CREATE TABLE IF NOT EXISTS users
                 (email TEXT PRIMARY KEY, password BLOB, created_at TEXT)''')
    # Password History
    c.execute('''CREATE TABLE IF NOT EXISTS password_history
                 (id INTEGER PRIMARY KEY AUTOINCREMENT,
                  email TEXT,
                  password BLOB,
                  set_at TEXT,
                  FOREIGN KEY(email) REFERENCES users(email))''')
    # Login Attempts
    c.execute('''CREATE TABLE IF NOT EXISTS login_attempts
                 (email TEXT PRIMARY KEY,
                  attempts INTEGER DEFAULT 0,
                  last_attempt REAL)''')
    conn.commit()
    conn.close()

def _get_timestamp():
    return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

# ----------------- User Registration -----------------
def register_user(email, password):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    try:
        hashed = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
        now = _get_timestamp()
        c.execute("INSERT INTO users (email, password, created_at) VALUES (?, ?, ?)", (email, hashed, now))
        c.execute("INSERT INTO password_history (email, password, set_at) VALUES (?, ?, ?)", (email, hashed, now))
        conn.commit()
        return True
    except sqlite3.IntegrityError:
        return False
    finally:
        conn.close()

# ----------------- Authentication -----------------
def authenticate_user(email, password):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT password FROM users WHERE email = ?", (email,))
    row = c.fetchone()
    conn.close()
    if row:
        stored_hash = row[0]

        # Convert SQLite BLOB/memoryview to bytes
        if isinstance(stored_hash, memoryview):
            stored_hash = stored_hash.tobytes()
        elif isinstance(stored_hash, str):
            stored_hash = stored_hash.encode('utf-8')  # fallback

        if bcrypt.checkpw(password.encode('utf-8'), stored_hash):
            _reset_attempts(email)
            return True
    _record_failed_attempt(email)
    return False

# ----------------- Password History -----------------
def check_is_old_password(email, password):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT password, set_at FROM password_history WHERE email = ? ORDER BY set_at DESC", (email,))
    history = c.fetchall()
    conn.close()
    for stored_hash, set_at in history:
        if isinstance(stored_hash, memoryview): stored_hash = stored_hash.tobytes()
        if isinstance(stored_hash, str): stored_hash = stored_hash.encode('utf-8')
        if bcrypt.checkpw(password.encode('utf-8'), stored_hash):
            return set_at
    return None

def check_password_reused(email, new_password):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT password FROM password_history WHERE email = ?", (email,))
    history = c.fetchall()
    conn.close()
    for (stored_hash,) in history:
        if isinstance(stored_hash, memoryview): stored_hash = stored_hash.tobytes()
        if isinstance(stored_hash, str): stored_hash = stored_hash.encode('utf-8')
        if bcrypt.checkpw(new_password.encode('utf-8'), stored_hash):
            return True
    return False

def update_password(email, new_password):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    hashed = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
    now = _get_timestamp()
    c.execute("UPDATE users SET password = ? WHERE email = ?", (hashed, email))
    c.execute("INSERT INTO password_history (email, password, set_at) VALUES (?, ?, ?)", (email, hashed, now))
    conn.commit()
    conn.close()

####forgot password

def save_security_question(email, question, answer):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    hashed = bcrypt.hashpw(answer.encode(), bcrypt.gensalt())
    c.execute("""
        CREATE TABLE IF NOT EXISTS security_questions (
            email TEXT PRIMARY KEY,
            question TEXT,
            answer_hash BLOB
        )
    """)
    c.execute("""
        INSERT OR REPLACE INTO security_questions (email, question, answer_hash)
        VALUES (?, ?, ?)
    """, (email, question, hashed))
    conn.commit()
    conn.close()

def get_security_question(email):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT question, answer_hash FROM security_questions WHERE email = ?", (email,))
    row = c.fetchone()
    conn.close()
    if row:
        return row[0], row[1]
    else:
        return None, None

# ----------------- User Existence -----------------
def check_user_exists(email):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT 1 FROM users WHERE email = ?", (email,))
    data = c.fetchone()
    conn.close()
    return data is not None

# ----------------- Rate Limiting -----------------
MAX_ATTEMPTS = 3
LOCKOUT_SECONDS = 60

def _record_failed_attempt(email):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    now = time.time()
    c.execute("SELECT attempts, last_attempt FROM login_attempts WHERE email = ?", (email,))
    row = c.fetchone()
    if row:
        attempts, last = row
        if now - last > LOCKOUT_SECONDS:
            c.execute("UPDATE login_attempts SET attempts = 1, last_attempt = ? WHERE email = ?", (now, email))
        else:
            c.execute("UPDATE login_attempts SET attempts = ?, last_attempt = ? WHERE email = ?", (attempts + 1, now, email))
    else:
        c.execute("INSERT INTO login_attempts (email, attempts, last_attempt) VALUES (?, 1, ?)", (email, now))
    conn.commit()
    conn.close()

def _reset_attempts(email):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("DELETE FROM login_attempts WHERE email = ?", (email,))
    conn.commit()
    conn.close()

def is_rate_limited(email):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT attempts, last_attempt FROM login_attempts WHERE email = ?", (email,))
    row = c.fetchone()
    conn.close()
    if row:
        attempts, last = row
        elapsed = time.time() - last
        if attempts >= MAX_ATTEMPTS and elapsed < LOCKOUT_SECONDS:
            return True, LOCKOUT_SECONDS - elapsed
    return False, 0

# ----------------- Admin -----------------
def get_all_users():
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT email, created_at FROM users ORDER BY created_at DESC")
    users = c.fetchall()
    conn.close()
    return users

def delete_user(email):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("DELETE FROM password_history WHERE email = ?", (email,))
    c.execute("DELETE FROM login_attempts WHERE email = ?", (email,))
    c.execute("DELETE FROM users WHERE email = ?", (email,))
    conn.commit()
    conn.close()


# ================= OTP SUPPORT =================
import random

_otp_store = {}   # in-memory (fine for demo / college project)

def send_otp(email):
    otp = str(random.randint(100000, 999999))
    _otp_store[email] = {
        "otp": otp,
        "time": time.time()
    }
    # TEMP: print OTP (email sending can be added later)
    print(f"[DEBUG OTP] {email} -> {otp}")


def verify_otp(email, otp):
    if email not in _otp_store:
        return False

    data = _otp_store[email]

    # OTP valid for 5 minutes
    if time.time() - data["time"] > 300:
        del _otp_store[email]
        return False

    if data["otp"] == otp:
        del _otp_store[email]
        return True

    return False


# ================= SECURITY QUESTION HELPERS =================
def get_security_question_only(email):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT question FROM security_questions WHERE email = ?", (email,))
    row = c.fetchone()
    conn.close()
    return row[0] if row else None


def verify_security_answer(email, answer):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute("SELECT answer_hash FROM security_questions WHERE email = ?", (email,))
    row = c.fetchone()
    conn.close()

    if not row:
        return False

    stored_hash = row[0]
    if isinstance(stored_hash, memoryview):
        stored_hash = stored_hash.tobytes()

    return bcrypt.checkpw(answer.encode(), stored_hash)


Overwriting db.py


In [None]:
%%writefile readability.py
import textstat

class ReadabilityAnalyzer:
    def __init__(self, text):
        self.text = text
        self.num_sentences = textstat.sentence_count(text)
        self.num_words = textstat.lexicon_count(text, removepunct=True)
        self.num_syllables = textstat.syllable_count(text)
        self.complex_words = textstat.difficult_words(text)
        self.char_count = textstat.char_count(text)

    def get_all_metrics(self):
        return {
            "Flesch Reading Ease": textstat.flesch_reading_ease(self.text),
            "Flesch-Kincaid Grade": textstat.flesch_kincaid_grade(self.text),
            "SMOG Index": textstat.smog_index(self.text),
            "Gunning Fog": textstat.gunning_fog(self.text),
            "Coleman-Liau": textstat.coleman_liau_index(self.text)
        }

Overwriting readability.py


In [None]:
%%writefile app.py
import streamlit as st
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import secrets, bcrypt, jwt, datetime, time, os, re, hmac, hashlib, struct
import db
from streamlit_option_menu import option_menu
import plotly.graph_objects as go
import PyPDF2
import textstat
import readability


# --- Configuration ---
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD")
SECRET_KEY = os.getenv("JWT_SECRET", "super-secret-key-change-this")
EMAIL_ADDRESS = "aartichandolkar7@gmail.com"
OTP_EXPIRY_MINUTES = 10

# --- Database Init ---
if 'db_initialized' not in st.session_state:
    db.init_db()
    st.session_state['db_initialized'] = True

# --- UI Theme ---
st.set_page_config(page_title="Infosys LLM", page_icon="üìù", layout="wide")
st.markdown("""
<style>
.stApp { background-color: #f5f7fa; color: #0f1e2e; }
h1, h2, h3 { color: #0f1e2e; font-family: 'Verdana', sans-serif; }
.stTextInput > div > div > input, .stTextArea > div > div > textarea {
    background-color: #ffffff; color: #0f1e2e; border: 1px solid #9ca3af; border-radius: 6px;
}
.stButton > button {
    background: linear-gradient(90deg, #4f46e5, #6366f1); color: white; border-radius: 8px; width: 100%;
    font-weight: bold;
}
.stButton > button:hover { background: linear-gradient(90deg, #6366f1, #4f46e5); }
section[data-testid="stSidebar"] { background-color: #e0f2f1; color: #0f1e2e; }

.stButton > button:has(span:contains("Back")) {
    background-color: #64748b !important;
</style>
""", unsafe_allow_html=True)

# --- Helpers ---
def is_valid_email(email):
    return re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email)

def check_password_strength(password):
    has_upper = bool(re.search(r"[A-Z]", password))
    has_lower = bool(re.search(r"[a-z]", password))
    has_digit = bool(re.search(r"\d", password))
    has_special = bool(re.search(r"[!@#$%^&*(),.?\":{}|<>]", password))
    if len(password) >= 8 and (has_upper or has_lower) and has_digit: return "Strong", []
    if len(password) >= 6: return "Medium", ["Add more characters for strong"]
    return "Weak", ["Password too short"]

# --- OTP Helpers ---
def generate_otp():
    secret = secrets.token_bytes(20)
    counter = int(time.time())
    msg = struct.pack(">Q", counter)
    h = hmac.new(secret, msg, hashlib.sha1).digest()
    o = h[19] & 0xf
    code = ((h[o] & 0x7f)<<24 | (h[o+1]&0xff)<<16 | (h[o+2]&0xff)<<8 | (h[o+3]&0xff)) % 1000000
    return f"{code:06d}"

def create_otp_token(otp, email):
    otp_hash = bcrypt.hashpw(otp.encode(), bcrypt.gensalt()).decode()
    payload = {'otp_hash': otp_hash, 'sub': email, 'type': 'password_reset',
               'iat': datetime.datetime.utcnow(),
               'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=OTP_EXPIRY_MINUTES)}
    return jwt.encode(payload, SECRET_KEY, algorithm='HS256')

def verify_otp_token(token, input_otp, email):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        if payload.get('sub') != email or payload.get('type') != 'password_reset':
            return False, "Invalid token"
        if bcrypt.checkpw(input_otp.encode(), payload['otp_hash'].encode()):
            return True, "Valid OTP"
        return False, "Incorrect OTP"
    except jwt.ExpiredSignatureError: return False, "OTP expired"
    except jwt.InvalidTokenError: return False, "Invalid token"

def send_email(to_email, otp, app_pass=None):
    msg = MIMEMultipart()
    msg['From'] = EMAIL_ADDRESS; msg['To'] = to_email; msg['Subject'] = "Infosys LLM OTP"
    msg.attach(MIMEText(f"Your OTP: {otp}", 'plain'))
    try:
        server = smtplib.SMTP('smtp.gmail.com', 587)
        server.starttls()
        server.login(EMAIL_ADDRESS, app_pass or EMAIL_PASSWORD)
        server.sendmail(EMAIL_ADDRESS, to_email, msg.as_string())
        server.quit(); return True, "Email sent"
    except Exception as e: return False, str(e)

# --- Gauge Helper for Readability ---
def create_gauge(value, title, min_val=0, max_val=100, color="#00bfa6"):
    fig = go.Figure(go.Indicator(mode="gauge+number", value=value,
                title={'text': title, 'font': {'color': color}},
                gauge={'axis': {'range':[min_val,max_val]}, 'bar': {'color': color}, 'bgcolor': "#e0f2f1"}))
    fig.update_layout(paper_bgcolor="#f5f7fa", height=250, margin=dict(l=10,r=10,t=40,b=10))
    return fig

# --- Session State & Routing ---
if 'user' not in st.session_state: st.session_state['user'] = None
if 'page' not in st.session_state: st.session_state['page'] = 'login'
def switch_page(page): st.session_state['page']=page
def logout(): st.session_state['user']=None; st.session_state['page']='login'
def go_to_login():
    st.session_state['page'] = 'login'
    st.session_state.pop('stage', None)
    st.session_state.pop('reset_email', None)
    st.session_state.pop('method', None)
    st.rerun()

# ---------------- REGISTRATION ----------------
def register_page():
    st.title("üîê Create New Account")

        # üîô Back to Login
    if st.button("‚¨Ö Back to Login"):
        st.session_state['page'] = 'login'
        st.rerun()

    st.markdown("---")
    email = st.text_input("Email *")
    password = st.text_input("Password *", type='password')
    confirm = st.text_input("Confirm Password *", type='password')

    question_list = ["Your first pet's name?", "Mother's maiden name?", "Favorite color?", "Birth city?"]
    question = st.selectbox("Select Security Question *", question_list)
    answer = st.text_input("Answer *", type='password')

    if st.button("Register", key="register_btn"):
        if not email or not password or not confirm or not answer:
            st.error("All fields are required (*)")
        elif password != confirm:
            st.error("Passwords do not match")
        elif not is_valid_email(email):
            st.error("Invalid email format")
        else:
            strength, feedback = check_password_strength(password)
            if strength == "Weak": st.error(f"Password too weak: {feedback}")
            elif db.register_user(email, password):
                db.save_security_question(email, question, answer)
                st.success("Registration Successful! Redirecting to login...")
                st.session_state['page'] = 'login'
            else:
                st.error("Email already exists")

# ---------------- FORGOT PASSWORD ----------------
def forgot_page():
    st.title("üîë Forgot Password")

     # üîô Back to Login button (ALWAYS visible)
    if st.button("‚¨Ö Back to Login"):
        go_to_login()

    st.markdown("---")

    # Initialize stage and method
    if 'stage' not in st.session_state: st.session_state['stage'] = 'choose_method'
    if 'reset_email' not in st.session_state: st.session_state['reset_email'] = ''
    if 'method' not in st.session_state: st.session_state['method'] = ''

    # -------- CHOOSE METHOD --------
    if st.session_state['stage'] == 'choose_method':
        st.subheader("Choose how you want to reset your password")
        col1, col2 = st.columns(2)
        with col1:
            if st.button("üíå OTP via Email", key="otp_btn", help="Send OTP to your registered email"):
                st.session_state['method'] = 'otp'
                st.session_state['stage'] = 'enter_email'
                st.rerun()
        with col2:
            if st.button("‚ùì Security Question", key="sq_btn", help="Answer your security question"):
                st.session_state['method'] = 'security_question'
                st.session_state['stage'] = 'enter_email'
                st.rerun()

    # -------- ENTER EMAIL --------
    elif st.session_state['stage'] == 'enter_email':
        email = st.text_input("Enter your registered email *", value=st.session_state['reset_email'])
        if st.button("Next"):
            if not email:
                st.error("Email is required")
            elif not is_valid_email(email):
                st.error("Invalid email format")
            else:
                st.session_state['reset_email'] = email
                if st.session_state['method'] == 'otp':
                    st.session_state['stage'] = 'otp_send'
                else:
                    st.session_state['stage'] = 'security_question'
                st.rerun()

    # -------- OTP SEND & VERIFY --------
    elif st.session_state['stage'] == 'otp_send':
        email = st.session_state['reset_email']
        if 'token' not in st.session_state:
            if st.button("Send OTP", key="send_otp"):
                otp = generate_otp()
                success, msg = send_email(email, otp)
                if success:
                    st.session_state['token'] = create_otp_token(otp, email)
                    st.success("OTP sent! Check your email.")
                else:
                    st.error(f"Failed to send OTP: {msg}")

        otp_input = st.text_input("Enter OTP *")
        if st.button("Verify OTP", key="verify_otp"):
            if not otp_input:
                st.error("OTP cannot be empty")
            else:
                valid, msg = verify_otp_token(st.session_state.get('token',''), otp_input, email)
                if valid:
                    st.success("OTP Verified! Enter new password below.")
                    st.session_state['stage'] = 'reset'
                else:
                    st.error(msg)

    # -------- SECURITY QUESTION --------
    elif st.session_state['stage'] == 'security_question':
        email = st.session_state['reset_email']
        try:
            question, stored_hash = db.get_security_question(email)
        except Exception as e:
            st.error("Could not retrieve security question. Make sure your email exists.")
            return
        st.info(f"Question: {question}")
        answer = st.text_input("Answer *", type='password')
        if st.button("Verify Answer", key="verify_answer"):
            if not answer:
                st.error("Answer cannot be empty")
            elif bcrypt.checkpw(answer.encode(), stored_hash):
                st.success("Correct! Enter new password below.")
                st.session_state['stage'] = 'reset'
            else:
                st.error("Incorrect answer")

    # -------- RESET PASSWORD --------
    elif st.session_state['stage'] == 'reset':
        email = st.session_state['reset_email']
        p1 = st.text_input("New Password *", type='password')
        p2 = st.text_input("Confirm New Password *", type='password')
        if st.button("Update Password", key="update_password"):
            if not p1 or not p2:
                st.error("Both password fields are required")
            elif p1 != p2:
                st.error("Passwords do not match")
            else:
                db.update_password(email, p1)
                st.success("Password updated successfully!")
                st.session_state['stage'] = 'choose_method'
                st.session_state['reset_email'] = ''
                st.session_state['method'] = ''
                if 'token' in st.session_state: del st.session_state['token']

# ---------------- LOGIN ----------------
def login_page():
    st.title("Infosys LLM Login")
    with st.form("login_form"):
        email = st.text_input("Email *")
        password = st.text_input("Password *", type="password")
        if st.form_submit_button("Login"):
            if db.authenticate_user(email, password):
                st.session_state['user'] = email
                st.success(f"Welcome {email}!")
            else:
                st.error("Invalid email or password")
    col1, col2 = st.columns(2)
    with col1:
        if st.button("Create Account", key="create_acc_btn"):
            switch_page("register")
    with col2:
        if st.button("Forgot Password", key="forgot_pw_btn"):
            switch_page("forgot")

# ---------------- CHAT ----------------
def chat_page():
    if not st.session_state['user']: switch_page("login"); return
    st.title("Chat Page")
    if "messages" not in st.session_state: st.session_state.messages=[]
    for msg in st.session_state.messages:
        with st.chat_message(msg["role"]): st.markdown(msg["content"])
    if prompt := st.chat_input("Ask me anything"):
        st.session_state.messages.append({"role":"user","content":prompt})
        with st.chat_message("assistant"):
            response=f"Simulated response: {prompt}"
            st.markdown(response)
            st.session_state.messages.append({"role":"assistant","content":response})

# ---------------- READABILITY ----------------
def readability_page():
    if not st.session_state['user']: switch_page('login'); return
    st.title("üìñ Text Readability Analyzer")
    tab1,tab2=st.tabs(["‚úçÔ∏è Input Text","üìÇ Upload TXT/PDF"])
    text_input=""
    with tab1: raw_text=st.text_area("Enter text (min 50 chars)",height=200); text_input=raw_text
    with tab2:
        uploaded_file=st.file_uploader("Upload file",type=["txt","pdf"])
        if uploaded_file:
            try:
                if uploaded_file.type=="application/pdf":
                    reader=PyPDF2.PdfReader(uploaded_file); text=""
                    for page in reader.pages: text+=page.extract_text()+"\n"
                    text_input=text; st.info(f"‚úÖ Loaded {len(reader.pages)} pages from PDF.")
                else: text_input=uploaded_file.read().decode("utf-8"); st.info(f"‚úÖ Loaded {uploaded_file.name}")
            except Exception as e: st.error(f"Error reading file: {e}")

    if st.button("Analyze Readability"):
        if len(text_input)<50: st.error("Text too short!")
        else:
            analyzer=readability.ReadabilityAnalyzer(text_input)
            metrics=analyzer.get_all_metrics()
            st.markdown("---"); st.subheader("üìä Metrics")
            for metric,color,info in [("Flesch Reading Ease","#4f46e5","0-100, higher easier"),
                                     ("Flesch-Kincaid Grade","#6366f1","US grade level"),
                                     ("SMOG Index","#facc15","Polysyllable difficulty"),
                                     ("Gunning Fog","#10b981","Sentence+Complex words"),
                                     ("Coleman-Liau","#f97316","Based on characters")]:
                st.plotly_chart(create_gauge(metrics[metric],metric,0,20 if metric!="Flesch Reading Ease" else 100,color),use_container_width=True)
                with st.expander(f"‚ÑπÔ∏è About {metric}"): st.caption(info)
            st.markdown("### üìù Text Stats")
            s1,s2,s3,s4,s5=st.columns(5)
            s1.metric("Sentences",analyzer.num_sentences)
            s2.metric("Words",analyzer.num_words)
            s3.metric("Syllables",analyzer.num_syllables)
            s4.metric("Complex Words",analyzer.complex_words)
            s5.metric("Characters",analyzer.char_count)

# ---------------- MAIN ROUTING ----------------
if st.session_state['user']:
    with st.sidebar:
        st.markdown(f"üë§ {st.session_state['user']}")
        opts=["Chat","Readability"]; icons=["chat-dots","book"]
        if st.session_state['user']=="admin@llm.com": opts.append("Admin"); icons.append("shield-lock")
        selected=option_menu("Menu", opts, icons=icons, default_index=0)
        if st.button("Log Out"): logout()
    if selected=="Chat": chat_page()
    elif selected=="Readability": readability_page()
else:
    if st.session_state['page']=="login": login_page()
    elif st.session_state['page']=="register": register_page()
    elif st.session_state['page']=="forgot": forgot_page()



Overwriting app.py


In [None]:
!pkill -f streamlit
!pkill -f ngrok

In [None]:
%%writefile run_colab.py
import os, subprocess, time
from pyngrok import ngrok

# --- Retrieve secrets from Colab ---
EMAIL_PASS = os.getenv("EMAIL_PASSWORD")
NGROK_TOKEN = os.getenv("NGROK_AUTHTOKEN")
if EMAIL_PASS: os.environ['EMAIL_PASSWORD']=EMAIL_PASS
os.environ['JWT_SECRET']="super-secret-change-me"

# --- Authenticate Ngrok & kill old tunnels ---
if NGROK_TOKEN:
    ngrok.set_auth_token(NGROK_TOKEN)
    ngrok.kill(); time.sleep(1)
    process = subprocess.Popen(['streamlit','run','app.py'], env=os.environ.copy())
    time.sleep(3)
    try:
        url=ngrok.connect(8501).public_url
        print(f"App Running: {url}")
    except Exception as e: print(e)
    try: input("Press ENTER to stop"); process.terminate(); ngrok.kill()
    except: process.terminate(); ngrok.kill()
else:
    print("‚ùå NGROK token missing")

Overwriting run_colab.py


In [None]:
!rm -f users.db

In [None]:
import os
import subprocess
import time
from google.colab import userdata
from pyngrok import ngrok

# 1. Retrieve secrets safely in the kernel
email_pass = None
ngrok_token = None

try:
    try:
        email_pass = userdata.get('EMAIL_PASSWORD')
    except Exception as e:
        print(f"‚ö†Ô∏è Warning: EMAIL_PASSWORD secret not found: {e}")

    try:
        ngrok_token = userdata.get('NGROK_AUTHTOKEN')
    except Exception as e:
        print(f"‚ö†Ô∏è Warning: NGROK_AUTHTOKEN secret not found: {e}")

    # Store in environment for the subprocess to see
    if email_pass:
        os.environ['EMAIL_PASSWORD'] = email_pass
    os.environ['JWT_SECRET'] = "super-secret-change-me"

except Exception as e:
    print(f"‚ùå Error setting up environment: {e}")

# 2. Authenticate Ngrok
if ngrok_token:
    ngrok.set_auth_token(ngrok_token)

    # 3. Kill old processes to prevent conflicts
    ngrok.kill()
    time.sleep(1)

    # 4. Run Streamlit in the background
    # We pass os.environ so it inherits the secrets we just set
    process = subprocess.Popen(['streamlit', 'run', 'app.py'], env=os.environ.copy())

    # Give it a moment to start
    time.sleep(3)

    # 5. Open Tunnel
    try:
        public_url = ngrok.connect(8501).public_url
        print(f"\nüöÄ Your App is running at: {public_url}")
        print("\nüëá Click the link above to open the app!")
    except Exception as e:
        print(f"‚ùå Error starting Ngrok tunnel: {e}")

    # 6. Keep Alive / Manual Stop
    print("\n‚úÖ App is running! Check the URL above.")
    try:
        input("\nüõë Press ENTER in this box to STOP the server...\n")
    except (KeyboardInterrupt, EOFError):
        pass
    finally:
        process.terminate()
        ngrok.kill()
        print("‚úÖ Server and Tunnel stopped.")

else:
    print("‚ùå No Ngrok Token found. Please add 'NGROK_AUTHTOKEN' to Colab Secrets.")



üöÄ Your App is running at: https://imprecatory-bradford-arboresque.ngrok-free.dev

üëá Click the link above to open the app!

‚úÖ App is running! Check the URL above.
