In [None]:
# @title Default ti

In [4]:
# @title Default title text
# ---------------------------
# Colab Notebook: Secure SQL / Anti-SQLi + AES-256 + Capability Codes
# ---------------------------

# 1) Install required packages
!pip -q install flask sqlalchemy cryptography pyjwt flask-limiter pandas

# 2) Imports
import os
import json
import time
from datetime import datetime, timedelta
from functools import wraps

from flask import Flask, request, jsonify, g
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

import pandas as pd
from sqlalchemy import create_engine, Column, Integer, String, LargeBinary, DateTime, text
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.exc import SQLAlchemyError

from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.backends import default_backend
import base64
import secrets
import jwt

# ---------------------------
# 3) Configuration (EDIT as needed)
# ---------------------------

# Google Drive paths (mount Drive in Colab before running or set accordingly)
INCOMING_CSV_PATH = "/content/drive/MyDrive/task2cloud/Modified_SQL_Dataset.csv"  # your uploaded dataset
DB_DIR = "/content/drive/MyDrive/task2cloud/database"  # Changed to a directory path
os.makedirs(DB_DIR, exist_ok=True)
DB_FILE = os.path.join(DB_DIR, "secure_master.db")

# Secrets (for demo only: set via environment variables / secret manager in production)
# Replace with a strong passphrase (store safely)
PASSPHRASE = os.environ.get("DEDUP_PASSPHRASE", "change_this_to_a_strong_passphrase")

# JWT secret (capability token signing key)
JWT_SECRET = os.environ.get("DEDUP_JWT_SECRET", "change_this_jwt_secret_please")

# AES params
PBKDF2_SALT = b"dedup_static_salt_please_change"  # change in production and store securely
PBKDF2_ITER = 200_000

# Flask config
FLASK_HOST = "0.0.0.0"
FLASK_PORT = 5000

# Define which columns are sensitive and must be encrypted in DB
SENSITIVE_COLUMNS = ["email", "phone", "ssn"]  # adjust to your dataset

# Query template whitelist (only these templates allowed)
# Each template is parameterized via :param placeholders (SQLAlchemy text binding).
QUERY_TEMPLATES = {
    "get_user_by_email": {
        "sql": "SELECT id, name, email_encrypted, email_nonce, created_at FROM users WHERE email_hash = :email_hash LIMIT 1",
        "requires": ["email_hash"]
    },
    # Add other read-only templates here; do NOT allow arbitrary SELECT with string interpolation
}

# ---------------------------
# 4) Helpers: AES-256-GCM encryption/decryption
# ---------------------------

def derive_key(passphrase: str, salt: bytes = PBKDF2_SALT, length: int = 32) -> bytes:
    # Derive a 32 byte (256-bit) key from passphrase using PBKDF2-HMAC-SHA256
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=length,
        salt=salt,
        iterations=PBKDF2_ITER,
        backend=default_backend()
    )
    return kdf.derive(passphrase.encode("utf-8"))

MASTER_KEY = derive_key(PASSPHRASE)  # binary 32 bytes

def encrypt_aes_gcm(plaintext: str, key: bytes = MASTER_KEY) -> (bytes, bytes):
    """
    Returns (ciphertext, nonce). Nonce should be stored with ciphertext.
    """
    aesgcm = AESGCM(key)
    nonce = secrets.token_bytes(12)  # recommended 12 bytes for GCM
    ct = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)
    return ct, nonce

def decrypt_aes_gcm(ciphertext: bytes, nonce: bytes, key: bytes = MASTER_KEY) -> str:
    aesgcm = AESGCM(key)
    pt = aesgcm.decrypt(nonce, ciphertext, None)
    return pt.decode("utf-8")

def b64(x: bytes) -> str:
    return base64.b64encode(x).decode('utf-8')

def ub64(s: str) -> bytes:
    return base64.b64decode(s.encode('utf-8'))

# ---------------------------
# 5) Lightweight DB model using SQLAlchemy (SQLite for demo)
# ---------------------------

Base = declarative_base()
engine = create_engine(f"sqlite:///{DB_FILE}", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine)

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=True)
    # For encrypted email we store ciphertext and nonce separately and a simple hash for lookup
    email_encrypted = Column(LargeBinary, nullable=True)
    email_nonce = Column(LargeBinary, nullable=True)
    email_hash = Column(String, nullable=True, index=True)  # simple SHA256 hex for lookup
    phone_encrypted = Column(LargeBinary, nullable=True)
    phone_nonce = Column(LargeBinary, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow)

Base.metadata.create_all(bind=engine)

# ---------------------------
# 6) Utility: deterministic hash for lookup (not reversible)
# ---------------------------
import hashlib
def deterministic_hash(value: str) -> str:
    if value is None:
        return None
    h = hashlib.sha256(value.strip().lower().encode("utf-8")).hexdigest()
    return h

# ---------------------------
# 7) Loading dataset from Drive and inserting into secured DB (encrypt sensitive columns)
# ---------------------------

def load_and_merge_csv(path):
    if not os.path.exists(path):
        print("Dataset not found at:", path)
        return {"status": "error", "reason": "file-not-found"}
    df = pd.read_csv(path, low_memory=False)
    # Minimal normalization
    df = df.fillna("")
    # Insert into DB with encryption
    db = SessionLocal()
    inserted = 0
    for _, row in df.iterrows():
        try:
            u = User()
            u.name = row.get("name", "")
            email = str(row.get("email", "")).strip()
            if email:
                ct, nonce = encrypt_aes_gcm(email)
                u.email_encrypted = ct
                u.email_nonce = nonce
                u.email_hash = deterministic_hash(email)
            phone = str(row.get("phone", "")).strip()
            if phone:
                ct2, nonce2 = encrypt_aes_gcm(phone)
                u.phone_encrypted = ct2
                u.phone_nonce = nonce2
            db.add(u)
            inserted += 1
        except Exception as e:
            print("Insert error:", e)
    try:
        db.commit()
    except Exception as e:
        db.rollback()
        print("Commit error:", e)
    finally:
        db.close()
    return {"status": "ok", "inserted": inserted}

# ---------------------------
# 8) Capability tokens (JWT) generation & verification
# ---------------------------

def create_capability_token(subject: str, actions: list, ttl_minutes: int = 15) -> str:
    payload = {
        "sub": subject,
        "actions": actions,
        "exp": datetime.utcnow() + timedelta(minutes=ttl_minutes),
        "iat": datetime.utcnow(),
        "nonce": secrets.token_hex(8)
    }
    token = jwt.encode(payload, JWT_SECRET, algorithm="HS256")
    return token

def verify_capability_token(token: str, required_action: str = None) -> dict:
    """
    Verifies token and (optionally) that required_action is allowed.
    Returns payload on success, raises jwt exceptions on failure.
    """
    payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
    if required_action and required_action not in payload.get("actions", []):
        raise jwt.InvalidTokenError("action-not-allowed")
    return payload

# ---------------------------
# 9) Flask app with two-layer protection:
#    - Layer 1: capability token required + jwt action check
#    - Layer 2: query template whitelist + parameterized query via SQLAlchemy text()
# ---------------------------

app = Flask(__name__)
limiter = Limiter(app, key_func=get_remote_address, default_limits=["60/minute"])


# Simple auth decorator
def require_capability(action):
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            auth = request.headers.get("Authorization", "")
            if not auth.startswith("Bearer "):
                return jsonify({"error": "Missing capability token"}), 401
            token = auth[len("Bearer "):].strip()
            try:
                payload = verify_capability_token(token, required_action=action)
                g.capability = payload
            except jwt.ExpiredSignatureError:
                return jsonify({"error": "Capability token expired"}), 401
            except Exception as e:
                return jsonify({"error": "Invalid capability token", "reason": str(e)}), 403
            return f(*args, **kwargs)
        return wrapper
    return require_capability

@app.route("/admin/generate_capability", methods=["POST"])
@limiter.limit("10/minute")
def admin_generate():
    """
    Admin endpoint to generate capability tokens.
    For demo: protected by a master passphrase in the request body (NOT for prod)
    Request JSON: { "admin_passphrase": "...", "subject":"clientA", "actions":["read"], "ttl_minutes":30 }
    """
    j = request.get_json() or {}
    master = j.get("admin_passphrase")
    if master != PASSPHRASE:
        return jsonify({"error": "admin_auth_failed"}), 403
    subject = j.get("subject", "unknown")
    actions = j.get("actions", ["read"])
    ttl = int(j.get("ttl_minutes", 15))
    token = create_capability_token(subject, actions, ttl)
    return jsonify({"capability_token": token})

@app.route("/query", methods=["POST"])
@limiter.limit("120/minute")
@require_capability("read")
def run_query():
    """
    Executes a whitelisted parameterized query. Body:
    { "template": "get_user_by_email", "params": { "email": "user@example.com" } }
    """
    j = request.get_json() or {}
    template_name = j.get("template")
    params = j.get("params", {})

    if template_name not in QUERY_TEMPLATES:
        return jsonify({"error": "template-not-found"}), 400
    template = QUERY_TEMPLATES[template_name]
    # Validate required params exist
    for req in template.get("requires", []):
        if req not in params and req not in j.get("params_transformed", {}):
            # allow transformed derived params
            if req == "email_hash" and "email" in params:
                # derive
                params["email_hash"] = deterministic_hash(params["email"])
            else:
                return jsonify({"error": f"missing-param-{req}"}), 400

    # Important: only pass parameters to SQLAlchemy text binding - prevent direct string interpolation
    sql_text = text(template["sql"])
    db = SessionLocal()
    try:
        result = db.execute(sql_text, params).fetchone()
        if not result:
            return jsonify({"found": False}), 200
        row = dict(result)
        # decrypt sensitive fields for response if capability allows
        out = {"id": row.get("id"), "name": row.get("name"), "created_at": str(row.get("created_at"))}
        if row.get("email_encrypted"):
            try:
                email_plain = decrypt_aes_gcm(row["email_encrypted"], row["email_nonce"])
                out["email"] = email_plain
            except Exception as e:
                out["email_decryption_error"] = str(e)
        return jsonify({"found": True, "row": out})
    except SQLAlchemyError as e:
        return jsonify({"error": "query-execution-failed", "detail": str(e)}), 500
    finally:
        db.close()

@app.route("/health", methods=["GET"])
def health():
    return jsonify({"status": "ok", "time": str(datetime.utcnow())})

# ---------------------------
# 10) Optional helper: run Flask app (in Colab you can test locally)
# ---------------------------

def run_app():
    print(f"Flask app will run on {FLASK_HOST}:{FLASK_PORT}. In Colab this is local only.")
    app.run(host=FLASK_HOST, port=FLASK_PORT, debug=False, use_reloader=False)

# ---------------------------
# 11) Drive mount / demo load (ONLY in Colab)
# ---------------------------
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=False)
    print("Drive mounted.")
except Exception:
    print("Colab drive mount skipped or not running in Colab.")

# If dataset exists, load and merge it (encrypting sensitive columns).
if os.path.exists(INCOMING_CSV_PATH):
    print("Loading dataset and inserting into secure DB (encrypting sensitive columns)...")
    res = load_and_merge_csv(INCOMING_CSV_PATH)
    print(res)
else:
    print("No dataset found at INCOMING_CSV_PATH. Create DB manually or upload file to Drive.")

# ---------------------------
# 12) Usage examples (show how to use endpoints)
# ---------------------------

print("\n=== Usage examples ===")
print("1) Generate capability token (POST /admin/generate_capability) with JSON:")
print('   {"admin_passphrase":"<PASSPHRASE>", "subject":"clientA", "actions":["read"], "ttl_minutes":30}')
print("2) Query user by email hash (POST /query):")
print('   {"template":"get_user_by_email", "params":{"email":"user@example.com"}}')
print("\nTo run the Flask app in this environment call run_app().")

# If you want to start the server automatically uncomment the following line:
# run_app()



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Drive mounted.
Loading dataset and inserting into secure DB (encrypting sensitive columns)...
{'status': 'ok', 'inserted': 30919}

=== Usage examples ===
1) Generate capability token (POST /admin/generate_capability) with JSON:
   {"admin_passphrase":"<PASSPHRASE>", "subject":"clientA", "actions":["read"], "ttl_minutes":30}
2) Query user by email hash (POST /query):
   {"template":"get_user_by_email", "params":{"email":"user@example.com"}}

To run the Flask app in this environment call run_app().
