/
defang.py
131 lines (116 loc) · 4.78 KB
/
defang.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import itertools
from urllib.parse import urlparse, urlunparse
from refinery.units import Arg, Unit
from refinery.lib.patterns import defanged, indicators, tlds
class defang(Unit):
"""
Defangs all URL, domain and IPv4 address indicators in the input data by replacing the last dot
in the expression by `[.]`. For example, `127.0.0.1` will be replaced by `127.0.0[.]1`. For URL
indicators, the colon after the procol scheme is also wrapped in brackets.
"""
_WHITELIST = [
B'wscript.shell',
]
_PROTOCOL_ESCAPES = {
B'http': B'hxxp',
B'https': B'hxxps',
B'ftp': B'fxp',
B'ftps': B'fxps',
}
def __init__(
self,
url_only: Arg.Switch('-u', help='Only defang URLs, do not look for domains or IPs.') = False,
url_protocol: Arg.Switch('-p', help='Escape the protocol in URLs.') = False,
dot_only: Arg.Switch('-d', help='Do not escape the protocol colon in URLs.') = False,
quote_md: Arg.Switch('-q', help='Wrap all indicators in backticks for markdown code.') = False
):
self.superinit(super(), **vars())
def _quote(self, word):
return word if not self.args.quote_md else B'`%s`' % word
def reverse(self, data: bytearray):
def refang(hostname):
return hostname[0].replace(B'[.]', B'.')
data = defanged.hostname.sub(refang, data)
data = data.replace(B'[:]//', B'://')
data = data.replace(B'[://]', B'://')
data = re.sub(B'h.{3}?(s?)://', B'http\\1://', data)
data = re.sub(B'fxp(s?)://', B'ftp\\1://', data)
return data
def process(self, data):
def replace_hostname(hostname: bytes, match=True):
if match:
return self._quote(replace_hostname(hostname[0], False))
self.log_info('replace:', hostname)
host = hostname
user, atsgn, host = host.rpartition(B'@')
host, colon, port = host.rpartition(B':')
host = host.lower()
if not colon:
host = port
port = B''
if host in self._WHITELIST:
return hostname
host = re.split(R'(?:\[\.\]|\.)', host.decode('latin1'))
if len(host) == 1:
return hostname
components = iter(reversed(host))
defanged_parts = [next(components)]
separator = '[.]'
for part in components:
defanged_parts.append(separator)
defanged_parts.append(part)
separator = '[.]' if part in tlds else '.'
defanged_host = ''.join(reversed(defanged_parts)).encode('latin1')
return user + atsgn + defanged_host + colon + port
def replace_url(url: bytes):
if not url:
return url
self.log_info('replace:', url)
url = url.replace(B'[:]//', B'://', 1)
url = url.replace(B'[.]', B'.')
prefix = B'tcp'
if url.startswith(B'://'):
scheme = 0
elif url.startswith(B'//'):
scheme = 1
prefix = prefix + B':'
else:
scheme = 2
prefix = B''
parsed = urlparse(prefix + url)
operations = {
name: self.process(getattr(parsed, name))
for name in ('path', 'params', 'query', 'fragment')
}
if self.args.url_protocol and parsed.scheme:
operations.update(scheme=self._PROTOCOL_ESCAPES.get(parsed.scheme.lower(), scheme))
if scheme < 2:
operations.update(scheme=B'')
operations.update(netloc=replace_hostname(parsed.netloc, False))
url = urlunparse(parsed._replace(**operations))
if scheme == 0:
url = B':' + url
if not self.args.dot_only:
url = url.replace(B'://', B'[:]//')
return self._quote(url)
urlsplit = defanged.url.split(data)
step = defanged.url.value.groups + 1
urlsplit[1::step] = [replace_url(t) for t in itertools.islice(iter(urlsplit), 1, None, step)]
if not self.args.url_only:
urlsplit[0::step] = [
indicators.hostname.sub(replace_hostname, t)
for t in itertools.islice(iter(urlsplit), 0, None, step)
]
def fuse(urlsplit):
txt = itertools.islice(iter(urlsplit), 0, None, step)
url = itertools.islice(iter(urlsplit), 1, None, step)
while True:
try:
yield next(txt)
yield next(url)
except StopIteration:
break
return B''.join(fuse(urlsplit))