<a href="https://colab.research.google.com/github/PrinceChaudhary1962/E-Commerce-Website/blob/main/Untitled8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Cell 1: Install dependencies
!pip install streamlit sqlalchemy passlib qrcode pillow email-validator pandas
# optional: if you want to tunnel for local testing
!npm install -g localtunnel


[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K
changed 22 packages in 3s
[1G[0K⠴[1G[0K
[1G[0K⠴[1G[0K3 packages are looking for funding
[1G[0K⠴[1G[0K  run `npm fund` for details
[1G[0K⠴[1G[0K

In [2]:
%%writefile models.py
from sqlalchemy import (create_engine, Column, Integer, String, Float, Boolean,
                        ForeignKey, DateTime, Text)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
import datetime

# NOTE: check_same_thread False is helpful for SQLite in threaded apps (Streamlit)
engine = create_engine("sqlite:///database.db", connect_args={"check_same_thread": False})
Base = declarative_base()
SessionLocal = sessionmaker(bind=engine)

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    password_hash = Column(String, nullable=False)   # hashed password
    is_verified = Column(Boolean, default=False)
    role = Column(String, default="customer")   # "admin" or "customer"
    created_at = Column(DateTime, default=datetime.datetime.utcnow)

    addresses = relationship("Address", back_populates="user", cascade="all, delete-orphan")
    orders = relationship("Order", back_populates="user", cascade="all, delete-orphan")

class OTP(Base):
    __tablename__ = "otps"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, index=True)
    code = Column(String)
    expires_at = Column(DateTime)

class Product(Base):
    __tablename__ = "products"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True, nullable=False)
    price = Column(Float, nullable=False)
    description = Column(Text)

class Address(Base):
    __tablename__ = "addresses"
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    label = Column(String)  # e.g., "Home", "Office"
    address = Column(Text)
    phone = Column(String)

    user = relationship("User", back_populates="addresses")

class Order(Base):
    __tablename__ = "orders"
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    total = Column(Float, default=0.0)
    address_id = Column(Integer, ForeignKey("addresses.id"), nullable=True)
    status = Column(String, default="pending")  # pending, paid, shipped
    created_at = Column(DateTime, default=datetime.datetime.utcnow)

    user = relationship("User", back_populates="orders")
    items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")

class OrderItem(Base):
    __tablename__ = "order_items"
    id = Column(Integer, primary_key=True, index=True)
    order_id = Column(Integer, ForeignKey("orders.id"))
    product_id = Column(Integer, ForeignKey("products.id"))
    qty = Column(Integer, default=1)
    price = Column(Float)

    order = relationship("Order", back_populates="items")

# Create all tables if not present
Base.metadata.create_all(engine)


Overwriting models.py


In [3]:
%%writefile utils.py
import os
import smtplib
import random
import datetime
from email.message import EmailMessage
from passlib.hash import bcrypt
import qrcode
from io import BytesIO
from urllib.parse import quote_plus

from models import SessionLocal, OTP

DB = SessionLocal()

# --- Password hashing ---
def hash_password(password: str) -> str:
    return bcrypt.hash(password)

def verify_password(password: str, hash_val: str) -> bool:
    return bcrypt.verify(password, hash_val)

# --- OTP handling ---
def generate_otp_code(length=6):
    return "".join([str(random.randint(0, 9)) for _ in range(length)])

def create_and_send_otp(email):
    code = generate_otp_code()
    expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
    # store in DB
    otp = OTP(email=email, code=code, expires_at=expires_at)
    DB.add(otp)
    DB.commit()
    # send email (uses env vars or streamlit secrets)
    sent = send_email(to=email, subject="Your OTP code", body=f"Your OTP code is: {code}\nIt expires in 10 minutes.")
    return {"code": code, "sent": sent}

def verify_otp(email, code):
    now = datetime.datetime.utcnow()
    q = DB.query(OTP).filter(OTP.email == email, OTP.code == code, OTP.expires_at >= now).first()
    if q:
        # delete OTP after use
        DB.delete(q)
        DB.commit()
        return True
    return False

def cleanup_expired_otps():
    now = datetime.datetime.utcnow()
    DB.query(OTP).filter(OTP.expires_at < now).delete()
    DB.commit()

# --- Email sending ---
def send_email(to, subject, body):
    # Reads SMTP settings from env vars: SMTP_USER, SMTP_PASS, SMTP_HOST, SMTP_PORT, FROM_EMAIL
    SMTP_USER = os.environ.get("SMTP_USER")
    SMTP_PASS = os.environ.get("SMTP_PASS")
    SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.gmail.com")
    SMTP_PORT = int(os.environ.get("SMTP_PORT", 587))
    FROM = os.environ.get("FROM_EMAIL", SMTP_USER)

    if not SMTP_USER or not SMTP_PASS:
        # For dev: print the OTP so you can continue testing locally
        print("SMTP credentials not set. Email not sent. (DEV) OTP would be printed here only for development.")
        return False

    msg = EmailMessage()
    msg["Subject"] = subject
    msg["From"] = FROM
    msg["To"] = to
    msg.set_content(body)

    with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
        server.starttls()
        server.login(SMTP_USER, SMTP_PASS)
        server.send_message(msg)
    return True

# --- UPI QR generation ---
def generate_upi_qr(vpa, name, amount, note="Payment"):
    # encode fields safely
    pa = quote_plus(vpa)
    pn = quote_plus(name)
    am = quote_plus(str(amount))
    tn = quote_plus(note)
    uri = f"upi://pay?pa={pa}&pn={pn}&am={am}&tn={tn}"
    img = qrcode.make(uri)
    buf = BytesIO()
    img.save(buf, format="PNG")
    buf.seek(0)
    return buf


Overwriting utils.py


In [4]:
# Cell C: seed DB
from models import SessionLocal, User, Product
import utils
DB = SessionLocal()

# create an admin if not exists
if not DB.query(User).filter(User.email=="admin@example.com").first():
    admin = User(email="admin@example.com", password_hash=utils.hash_password("adminpass"), is_verified=True, role="admin")
    DB.add(admin)
    DB.commit()
    print("Admin created: admin@example.com / adminpass")
else:
    print("Admin already exists.")

# sample products (only add if none exist)
if DB.query(Product).count() == 0:
    DB.add_all([
        Product(name="T-Shirt", price=399.0, description="Comfortable cotton tee."),
        Product(name="Backpack", price=1499.0, description="Durable backpack for daily use."),
        Product(name="Headphones", price=2499.0, description="Wireless headphones.")
    ])
    DB.commit()
    print("Sample products added.")
else:
    print("Products already exist.")


Admin already exists.
Products already exist.


In [5]:
# Cell 3: create utils.py
%%writefile utils.py
import os
import smtplib
import random
import datetime
from email.message import EmailMessage
from passlib.hash import bcrypt
import qrcode
from io import BytesIO

from models import SessionLocal, OTP

DB = SessionLocal()

# --- Password hashing ---
def hash_password(password: str) -> str:
    return bcrypt.hash(password)

def verify_password(password: str, hash_val: str) -> bool:
    return bcrypt.verify(password, hash_val)

# --- OTP handling ---
def generate_otp_code(length=6):
    return "".join([str(random.randint(0, 9)) for _ in range(length)])

def create_and_send_otp(email):
    code = generate_otp_code()
    expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
    # store in DB
    otp = OTP(email=email, code=code, expires_at=expires_at)
    DB.add(otp)
    DB.commit()
    # send email (uses env vars or streamlit secrets)
    send_email(to=email, subject="Your OTP code", body=f"Your OTP code is: {code}\nIt expires in 10 minutes.")
    return code

def verify_otp(email, code):
    now = datetime.datetime.utcnow()
    q = DB.query(OTP).filter(OTP.email == email, OTP.code == code, OTP.expires_at >= now).first()
    if q:
        # delete OTP after use
        DB.delete(q)
        DB.commit()
        return True
    return False

# --- Email sending ---
def send_email(to, subject, body):
    # Reads SMTP settings from env vars: SMTP_USER, SMTP_PASS, SMTP_HOST, SMTP_PORT, FROM_EMAIL
    SMTP_USER = os.environ.get("SMTP_USER")
    SMTP_PASS = os.environ.get("SMTP_PASS")
    SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.gmail.com")
    SMTP_PORT = int(os.environ.get("SMTP_PORT", 587))
    FROM = os.environ.get("FROM_EMAIL", SMTP_USER)

    if not SMTP_USER or not SMTP_PASS:
        print("Warning: SMTP credentials not set. OTP emails will not be sent. Set SMTP_USER and SMTP_PASS.")
        return False

    msg = EmailMessage()
    msg["Subject"] = subject
    msg["From"] = FROM
    msg["To"] = to
    msg.set_content(body)

    with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
        server.starttls()
        server.login(SMTP_USER, SMTP_PASS)
        server.send_message(msg)
    return True

# --- UPI QR generation ---
def generate_upi_qr(vpa, name, amount, note="Payment"):
    # simple UPI deep link payload
    # upi://pay?pa=<VPA>&pn=<name>&am=<amount>&tn=<note>
    uri = f"upi://pay?pa={vpa}&pn={name}&am={amount}&tn={note}"
    img = qrcode.make(uri)
    buf = BytesIO()
    img.save(buf, format="PNG")
    buf.seek(0)
    return buf



Overwriting utils.py


In [6]:
# Cell 4: create app.py
%%writefile app.py
import streamlit as st
from models import SessionLocal, Product, User, Address, Order, OrderItem
import utils
import os
import datetime

DB = SessionLocal()

st.set_page_config(page_title="Mini E-Commerce", layout="wide")

# -------------------
# Helper UI functions
# -------------------
def get_current_user():
    return st.session_state.get("user")  # holds user email or user dict

def require_login():
    if not get_current_user():
        st.warning("Please login or register to continue.")
        st.stop()

def load_user_obj(email):
    return DB.query(User).filter(User.email == email).first()

def refresh_products():
    return DB.query(Product).all()

# -------------------
# Authentication UI
# -------------------
def signup_flow():
    st.subheader("Sign up")
    email = st.text_input("Email", key="su_email")
    password = st.text_input("Password", type="password", key="su_pass")
    role = st.selectbox("Role", ["customer", "admin"], key="su_role")
    if st.button("Send OTP", key="send_otp"):
        if DB.query(User).filter(User.email == email).first():
            st.error("User already exists. Please login.")
        else:
            # create OTP and send
            utils.create_and_send_otp(email)
            st.success("OTP sent to your email (if SMTP configured). Enter OTP below to verify.")
            st.session_state["pending_signup"] = {"email": email, "password": password, "role": role}

    if "pending_signup" in st.session_state:
        otp = st.text_input("Enter OTP", key="signup_otp")
        if st.button("Verify & Create", key="verify_create"):
            pending = st.session_state.get("pending_signup")
            if pending and utils.verify_otp(pending["email"], otp):
                u = User(email=pending["email"], password_hash=utils.hash_password(pending["password"]),
                         is_verified=True, role=pending["role"])
                DB.add(u)
                DB.commit()
                st.success("Account created and verified. Please login.")
                del st.session_state["pending_signup"]
            else:
                st.error("Invalid or expired OTP.")

def login_flow():
    st.subheader("Login")
    email = st.text_input("Email", key="li_email")
    password = st.text_input("Password", type="password", key="li_pass")
    if st.button("Login", key="login_btn"):
        u = load_user_obj(email)
        if not u:
            st.error("User not found. Please sign up.")
            return
        if not utils.verify_password(password, u.password_hash):
            st.error("Incorrect password.")
            return
        if not u.is_verified:
            st.error("Account not verified. Please sign up and verify OTP.")
            return
        # set session
        st.session_state["user"] = {"email": u.email, "role": u.role}
        st.success(f"Logged in as {u.email} ({u.role})")

def logout():
    if st.button("Logout"):
        st.session_state.pop("user", None)
        st.session_state.pop("cart", None)
        st.success("Logged out.")

# -------------------
# Admin pages
# -------------------
def admin_page():
    st.header("Admin Dashboard")
    st.subheader("Add product")
    name = st.text_input("Product name", key="p_name")
    price = st.number_input("Price", min_value=0.0, format="%.2f", key="p_price")
    desc = st.text_area("Description", key="p_desc")
    if st.button("Add product", key="add_p"):
        if not name:
            st.error("Name required.")
        else:
            p = Product(name=name, price=price, description=desc)
            DB.add(p)
            DB.commit()
            st.success("Product added.")
    st.subheader("Existing products")
    products = refresh_products()
    for p in products:
        st.write(f"**{p.name}** — ₹{p.price:.2f}")
        if st.button(f"Delete {p.id}", key=f"del_{p.id}"):
            DB.delete(p)
            DB.commit()
            st.success("Deleted. Refreshing...")
            st.experimental_rerun()

# -------------------
# Customer pages
# -------------------
def customer_page():
    st.header("Shop")
    products = refresh_products()
    cols = st.columns(3)
    for i, p in enumerate(products):
        with cols[i % 3]:
            st.subheader(p.name)
            st.write(p.description)
            st.write(f"Price: ₹{p.price:.2f}")
            if st.button("Add to cart", key=f"add_{p.id}"):
                cart = st.session_state.get("cart", [])
                cart.append({"id": p.id, "name": p.name, "price": p.price, "qty": 1})
                st.session_state["cart"] = cart
                st.success("Added to cart.")

    st.sidebar.header("Your Cart")
    cart = st.session_state.get("cart", [])
    if cart:
        total = sum([item["price"] * item.get("qty",1) for item in cart])
        for idx, item in enumerate(cart):
            st.sidebar.write(f"{item['name']} x {item.get('qty',1)} — ₹{item['price']*item.get('qty',1):.2f}")
            if st.sidebar.button(f"Remove {idx}", key=f"rm_{idx}"):
                cart.pop(idx)
                st.session_state.cart = cart
                st.experimental_rerun()
        st.sidebar.write(f"**Total: ₹{total:.2f}**")
        if st.sidebar.button("Checkout"):
            st.session_state["checkout_total"] = total
            st.session_state["show_checkout"] = True
    else:
        st.sidebar.write("Cart is empty.")

    if st.session_state.get("show_checkout"):
        checkout_page()

def checkout_page():
    st.header("Checkout")
    user_email = get_current_user()["email"]
    user_obj = DB.query(User).filter(User.email==user_email).first()
    st.subheader("Choose delivery address or add new")
    for a in user_obj.addresses:
        st.write(f"**{a.label}** — {a.address} — {a.phone}")
        if st.button(f"Deliver to {a.id}", key=f"addr_{a.id}"):
            st.session_state["deliver_addr"] = a.id
            st.success("Address selected.")
    st.write("---")
    st.subheader("Add new address")
    label = st.text_input("Label", key="addr_label")
    address = st.text_area("Address", key="addr_text")
    phone = st.text_input("Phone", key="addr_phone")
    if st.button("Save address", key="save_addr"):
        ad = Address(user_id=user_obj.id, label=label, address=address, phone=phone)
        DB.add(ad)
        DB.commit()
        st.success("Address saved.")
        st.experimental_rerun()

    if st.session_state.get("deliver_addr"):
        total = st.session_state.get("checkout_total", 0.0)
        st.subheader(f"Pay ₹{total:.2f} via UPI")
        upi_vpa = os.environ.get("UPI_VPA", "your-vpa@bank")
        name = "Merchant"
        buf = utils.generate_upi_qr(upi_vpa, name, f"{total:.2f}", note="OrderPayment")
        st.image(buf)
        st.write("Scan this QR with your UPI app and complete the payment.")
        if st.button("Mark as Paid (simulate)", key="mark_paid"):
            # create order record
            order = Order(user_id=user_obj.id, total=total, address_id=st.session_state["deliver_addr"], status="paid")
            DB.add(order)
            DB.commit()
            # add items
            cart = st.session_state.get("cart", [])
            for it in cart:
                oi = OrderItem(order_id=order.id, product_id=it["id"], qty=it.get("qty",1), price=it["price"])
                DB.add(oi)
            DB.commit()
            st.success("Order placed and marked paid. Thank you!")
            # clear cart & checkout flags
            st.session_state.pop("cart", None)
            st.session_state.pop("show_checkout", None)
            st.session_state.pop("checkout_total", None)

# -------------------
# App Layout & Routes
# -------------------
st.title("🛍️ Mini E-Commerce (Streamlit)")

# Top bar: login/signup/logout
col1, col2 = st.columns([2,1])
with col1:
    st.write("Welcome to the mini store.")
with col2:
    if get_current_user():
        st.write(f"Signed in as: {get_current_user()['email']} ({get_current_user()['role']})")
        logout()
    else:
        tab = st.selectbox("Auth", ["Login", "Sign up"])
        if tab == "Login":
            login_flow()
        else:
            signup_flow()

# Main navigation
if get_current_user():
    role = get_current_user()["role"]
    if role == "admin":
        admin_page()
    else:
        customer_page()
else:
    st.info("Browse products (login to purchase).")
    # show products to guests
    products = DB.query(Product).all()
    for p in products:
        st.write(f"**{p.name}** — ₹{p.price:.2f}")
        st.write(p.description)


Overwriting app.py


In [7]:
# Cell 5: create requirements.txt and README.md
%%writefile requirements.txt
streamlit
sqlalchemy
passlib
qrcode
pillow
email-validator
pandas

%%writefile README.md
# Mini E-Commerce Streamlit App

Run locally:
1. Install dependencies: `pip install -r requirements.txt`
2. Set SMTP env vars (for OTP emails): SMTP_USER, SMTP_PASS, SMTP_HOST, SMTP_PORT, FROM_EMAIL
3. Set UPI_VPA env var to your UPI id (for QR generation).
4. Run: `streamlit run app.py`

Deployment:
- Push repo to GitHub.
- On Streamlit Cloud, connect repo and add secrets (SMTP_* and UPI_VPA).
- Select `app.py` as the entry point.


Overwriting requirements.txt


In [8]:
# Cell 6: seed DB with an admin (for testing) and sample products
from models import SessionLocal, User, Product
import utils
DB = SessionLocal()

# create an admin if not exists
if not DB.query(User).filter(User.email=="admin@example.com").first():
    admin = User(email="admin@example.com", password_hash=utils.hash_password("adminpass"), is_verified=True, role="admin")
    DB.add(admin)
    DB.commit()
    print("Admin created: admin@example.com / adminpass")
else:
    print("Admin already exists.")

# sample products
if DB.query(Product).count() == 0:
    DB.add_all([
        Product(name="T-Shirt", price=399.0, description="Comfortable cotton tee."),
        Product(name="Backpack", price=1499.0, description="Durable backpack for daily use."),
        Product(name="Headphones", price=2499.0, description="Wireless headphones.")
    ])
    DB.commit()
    print("Sample products added.")
else:
    print("Products already exist.")


Admin already exists.
Products already exist.


In [9]:
# Cell 7: initialize git and push (edit remote URL)
!git init
!git add .
!git commit -m "Initial mini-ecommerce app"
# replace with your repo
!git branch -M main
# set your remote: replace the URL below
!git remote add origin https://github.com/YOUR_USERNAME/ecommerce-app.git
!git push -u origin main


Reinitialized existing Git repository in /content/.git/
Author identity unknown

*** Please tell me who you are.

Run

  git config --global user.email "you@example.com"
  git config --global user.name "Your Name"

to set your account's default identity.
Omit --global to set the identity only in this repository.

fatal: unable to auto-detect email address (got 'root@44c1dac27894.(none)')
error: remote origin already exists.
error: src refspec main does not match any
[31merror: failed to push some refs to 'https://github.com/YOUR_USERNAME/ecommerce-app.git'
[m