Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added host header injection vulnerability module #279

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import random

from canari.maltego.transform import Transform
from canari.maltego.entities import URL
from canari.framework import EnableDebugWindow
from common.entities import NettackerScan

from lib.vuln.host_header.engine import start

from database.db import __logs_by_scan_id as find_log

__author__ = 'Shaddy Garg'
__copyright__ = 'Copyright 2018, nettacker_transforms Project'
__credits__ = []

__license__ = 'GPLv3'
__version__ = '0.1'
__maintainer__ = 'Shaddy Garg'
__email__ = 'shaddygarg1@gmail.com'
__status__ = 'Development'


@EnableDebugWindow
class HostHeaderVulnScan(Transform):
"""TODO: Your transform description."""

# The transform input entity type.
input_type = NettackerScan

def do_transform(self, request, response, config):
# TODO: write your code here.
scan_request = request.entity
scan_id = "".join(random.choice("0123456789abcdef") for x in range(32))
scan_request.ports = scan_request.ports.split(', ') if scan_request.ports is not None else None
start(scan_request.host, [], [], scan_request.ports, scan_request.timeout_sec, scan_request.thread_no,
1, 1, 'abcd', 0, "en", scan_request.verbose, scan_request.socks_proxy, scan_request.retries, [], scan_id,
"Through Maltego")
results = find_log(scan_id, "en")
for result in results:
url = result["HOST"] + ":" + result["PORT"]
response += URL(url=url, title=result["DESCRIPTION"],
short_title="Host/x-forwarded-for/x-forwarded-host header Vulnerability Found!",
link_label='host_header_vuln')
return response

def on_terminate(self):
"""This method gets called when transform execution is prematurely terminated. It is only applicable for local
transforms. It can be excluded if you don't need it."""
pass
3 changes: 3 additions & 0 deletions lib/vuln/host_header/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
pass
310 changes: 310 additions & 0 deletions lib/vuln/host_header/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Aman Gupta , github.com/aman566

import socket
import socks
import time
import json
import threading
import string
import random
import sys
import struct
import re
import os
from OpenSSL import crypto
import ssl
from core.alert import *
from core.targets import target_type
from core.targets import target_to_host
from core.load_modules import load_file_path
from lib.socks_resolver.engine import getaddrinfo
from core._time import now
from core.log import __log_into_file
import requests
from lib.payload.wordlists import useragents


def extra_requirements_dict():
return {"host_header_vuln_ports": [80, 443]}


def conn(targ, port, timeout_sec, socks_proxy):
try:
if socks_proxy is not None:
socks_version = (
socks.SOCKS5 if socks_proxy.startswith("socks5://") else socks.SOCKS4
)
socks_proxy = socks_proxy.rsplit("://")[1]
if "@" in socks_proxy:
socks_username = socks_proxy.rsplit(":")[0]
socks_password = socks_proxy.rsplit(":")[1].rsplit("@")[0]
socks.set_default_proxy(
socks_version,
str(socks_proxy.rsplit("@")[1].rsplit(":")[0]),
int(socks_proxy.rsplit(":")[-1]),
username=socks_username,
password=socks_password,
)
socket.socket = socks.socksocket
socket.getaddrinfo = getaddrinfo
else:
socks.set_default_proxy(
socks_version,
str(socks_proxy.rsplit(":")[0]),
int(socks_proxy.rsplit(":")[1]),
)
socket.socket = socks.socksocket
socket.getaddrinfo = getaddrinfo()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sys.stdout.flush()
s.settimeout(timeout_sec)
s.connect((targ, port))
return s
except:
return None


def host_header_vuln(
target,
port,
timeout_sec,
log_in_file,
language,
time_sleep,
thread_tmp_filename,
socks_proxy,
scan_id,
scan_cmd,
):
try:
s = conn(target_to_host(target), port, timeout_sec, socks_proxy)
if not s:
return False
else:
global result
host_headers = {
"Host": "evil.com",
"User-Agent": random.choice(useragents.useragents()),
}
try:
req_host = requests.get(
target,
verify=False,
timeout=timeout_sec,
headers=host_headers,
allow_redirects=False,
)
if "evil.com" in req_host.headers["Location"]:
result = "Found Host header injection vulnerability"
return True
except:
pass
x_forwarded_for_headers = {
"x-forwarded-for": "evil.com",
"User-Agent": random.choice(useragents.useragents()),
}
try:
req_forwarded_for = requests.get(
target,
verify=False,
timeout=timeout_sec,
headers=x_forwarded_for_headers,
allow_redirects=False,
)
if "evil.com" in req_forwarded_for.text:
result = "Response contains 'evil.com' from x-forwarded-for: evil.com header value. May be vulnerable to cross-site scripting!!"
return True
if "evil.com" in req_forwarded_for.headers["Location"]:
result = "Found X-forwarded-for header injection vulnerability."
return True
except:
pass
x_forwarded_host_headers = {
"x-forwarded-host": "evil.com",
"User-Agent": random.choice(useragents.useragents()),
}
try:
req_forwarded_host = requests.get(
target,
verify=False,
timeout=timeout_sec,
headers=x_forwarded_host_headers,
allow_redirects=False,
)
if "evil.com" in req_forwarded_host.headers["Location"]:
result = "Found X-forwarded-Host header injection vulnerability"
return True
except:
return False
except:
# some error warning
return False


def __host_header_vuln(
target,
port,
timeout_sec,
log_in_file,
language,
time_sleep,
thread_tmp_filename,
socks_proxy,
scan_id,
scan_cmd,
):
if host_header_vuln(
target,
port,
timeout_sec,
log_in_file,
language,
time_sleep,
thread_tmp_filename,
socks_proxy,
scan_id,
scan_cmd,
):
info(messages(language, "target_vulnerable").format(target, port, result))
__log_into_file(thread_tmp_filename, "w", "0", language)
data = json.dumps(
{
"HOST": target,
"USERNAME": "",
"PASSWORD": "",
"PORT": port,
"TYPE": "host_header_vuln",
"DESCRIPTION": messages(language, "vulnerable").format(result),
"TIME": now(),
"CATEGORY": "vuln",
"SCAN_ID": scan_id,
"SCAN_CMD": scan_cmd,
}
)
__log_into_file(log_in_file, "a", data, language)
return True
else:
return False


def start(
target,
users,
passwds,
ports,
timeout_sec,
thread_number,
num,
total,
log_in_file,
time_sleep,
language,
verbose_level,
socks_proxy,
retries,
methods_args,
scan_id,
scan_cmd,
): # Main function
if (
target_type(target) != "SINGLE_IPv4"
or target_type(target) != "DOMAIN"
or target_type(target) != "HTTP"
):
# requirements check
new_extra_requirements = extra_requirements_dict()
if methods_args is not None:
for extra_requirement in extra_requirements_dict():
if extra_requirement in methods_args:
new_extra_requirements[extra_requirement] = methods_args[
extra_requirement
]
extra_requirements = new_extra_requirements
if ports is None:
ports = extra_requirements["host_header_vuln_ports"]
threads = []
total_req = len(ports)
thread_tmp_filename = "{}/tmp/thread_tmp_".format(load_file_path()) + "".join(
random.choice(string.ascii_letters + string.digits) for _ in range(20)
)
__log_into_file(thread_tmp_filename, "w", "1", language)
trying = 0
keyboard_interrupt_flag = False
for port in ports:
port = int(port)
t = threading.Thread(
target=__host_header_vuln,
args=(
target,
int(port),
timeout_sec,
log_in_file,
language,
time_sleep,
thread_tmp_filename,
socks_proxy,
scan_id,
scan_cmd,
),
)
threads.append(t)
t.start()
trying += 1
if verbose_level > 3:
info(
messages(language, "trying_message").format(
trying, total_req, num, total, target, port, "host_header_vuln"
)
)
while 1:
try:
if threading.activeCount() >= thread_number:
time.sleep(0.01)
else:
break
except KeyboardInterrupt:
keyboard_interrupt_flag = True
break
if keyboard_interrupt_flag:
break
# wait for threads
kill_switch = 0
kill_time = int(timeout_sec / 0.1) if int(timeout_sec / 0.1) != 0 else 1
while 1:
time.sleep(0.1)
kill_switch += 1
try:
if threading.activeCount() == 1 or kill_switch == kill_time:
break
except KeyboardInterrupt:
break
thread_write = int(open(thread_tmp_filename).read().rsplit()[0])
if thread_write == 1 and verbose_level != 0:
info(
messages(language, "no_vulnerability_found").format("host_header_vuln")
)
data = json.dumps(
{
"HOST": target,
"USERNAME": "",
"PASSWORD": "",
"PORT": "",
"TYPE": "host_header_vuln",
"DESCRIPTION": messages(language, "no_vulnerability_found").format(
"host_header_vuln"
),
"TIME": now(),
"CATEGORY": "scan",
"SCAN_ID": scan_id,
"SCAN_CMD": scan_cmd,
}
)
__log_into_file(log_in_file, "a", data, language)
os.remove(thread_tmp_filename)

else:
warn(
messages(language, "input_target_error").format("host_header_vuln", target)
)