Skip to content

Commit

Permalink
new: Cloudflare lookup feature to flag IPs appropriately
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Jan 30, 2023
1 parent 584eb5e commit ecb4623
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 16 deletions.
49 changes: 33 additions & 16 deletions lookyloo/capturecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .indexing import Indexing
from .default import LookylooException, try_make_file, get_config
from .exceptions import MissingCaptureDirectory, NoValidHarFile, MissingUUID, TreeNeedsRebuild
from .modules import Cloudflare


class CaptureCache():
Expand Down Expand Up @@ -133,6 +134,13 @@ def __init__(self, redis: Redis, contextualizer: Optional[Context]=None):
# Unable to setup IPASN History
self.logger.warning(f'Unable to setup IPASN History: {e}')
self.ipasnhistory = None
try:
self.cloudflare: Optional[Cloudflare] = Cloudflare()
if not self.cloudflare.available:
self.cloudflare = None
except Exception as e:
self.logger.warning(f'Unable to setup Cloudflare: {e}')
self.cloudflare = None

@property
def cached_captures(self) -> Set[str]:
Expand Down Expand Up @@ -474,6 +482,9 @@ def _build_cname_chain(known_cnames: Dict[str, str], hostname) -> List[str]:
elif node.name in host_ips:
node.add_feature('resolved_ips', host_ips[node.name])

if self.cloudflare:
cflare_hits = self.cloudflare.ips_lookup(_all_ips)

if self.ipasnhistory:
# Throw all the IPs to IPASN History for query later.
if ips := [{'ip': ip} for ip in _all_ips]:
Expand All @@ -490,23 +501,29 @@ def _build_cname_chain(known_cnames: Dict[str, str], hostname) -> List[str]:
r = list(response['response'].values())[0]
if ip not in ipasn and r:
ipasn[ip] = r
if ipasn:
# retraverse tree to populate it with the features
for node in ct.root_hartree.hostname_tree.traverse():
if not hasattr(node, 'resolved_ips'):
continue
ipasn_entries = {}
if 'v4' in node.resolved_ips and 'v6' in node.resolved_ips:
_all_ips = node.resolved_ips['v4'] | node.resolved_ips['v6']
else:
# old format
_all_ips = node.resolved_ips
for ip in _all_ips:
if ip not in ipasn:
continue

if ipasn or cflare_hits:
# retraverse tree to populate it with the features
for node in ct.root_hartree.hostname_tree.traverse():
if not hasattr(node, 'resolved_ips'):
continue
ipasn_entries = {}
cflare_entries = {}
if 'v4' in node.resolved_ips and 'v6' in node.resolved_ips:
_all_ips = set(node.resolved_ips['v4']) | set(node.resolved_ips['v6'])
else:
# old format
_all_ips = node.resolved_ips
for ip in _all_ips:
if ip in ipasn:
ipasn_entries[ip] = ipasn[ip]
if ipasn_entries:
node.add_feature('ipasn', ipasn_entries)
if ip in cflare_hits:
cflare_entries[ip] = True

if ipasn_entries:
node.add_feature('ipasn', ipasn_entries)
if cflare_entries:
node.add_feature('cloudflare', cflare_entries)

with cnames_path.open('w') as f:
json.dump(host_cnames, f)
Expand Down
1 change: 1 addition & 0 deletions lookyloo/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
from .hashlookup import HashlookupModule as Hashlookup # noqa
from .riskiq import RiskIQ, RiskIQError # noqa
from .urlhaus import URLhaus # noqa
from .cloudflare import Cloudflare # noqa
65 changes: 65 additions & 0 deletions lookyloo/modules/cloudflare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python3

import ipaddress
import logging
from typing import Dict, List

import requests

from har2tree import CrawledTree

from ..default import ConfigError, get_config


class Cloudflare():
'''This module checks if an IP is announced by Cloudflare.'''

def __init__(self):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))

# Get IPv4
r = requests.get('https://www.cloudflare.com/ips-v4')
try:
r.raise_for_status()
ipv4_list = r.text
except Exception as e:
self.logger.warning(f'Unable to get Cloudflare IPv4 list: {e}')
self.available = False
return
# Get IPv6
try:
r = requests.get('https://www.cloudflare.com/ips-v6')
ipv6_list = r.text
except Exception as e:
self.logger.warning(f'Unable to get Cloudflare IPv6 list: {e}')
self.available = False
return

self.available = True

self.v4_list = [ipaddress.ip_network(net) for net in ipv4_list.split('\n')]
self.v6_list = [ipaddress.ip_network(net) for net in ipv6_list.split('\n')]

def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect'''
if not self.available:
return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'}

# TODO: trigger something?
return {'success': 'Module triggered'}

def ips_lookup(self, ips: List[str]) -> Dict[str, bool]:
'''Lookup a list of IPs. True means it is a known Cloudflare IP'''
if not self.available:
raise ConfigError('Hashlookup not available, probably not enabled.')

to_return: Dict[str, bool] = {}
for ip_s, ip_p in [(ip, ipaddress.ip_address(ip)) for ip in ips]:
if ip_p.version == 4:
to_return[ip_s] = any(ip_p in net for net in self.v4_list)
else:
to_return[ip_s] = any(ip_p in net for net in self.v6_list)
return to_return
2 changes: 2 additions & 0 deletions website/web/templates/hostname_popup.html
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ <h5>Domain IPs from a standalone DNS lookup:</h5>
<li>
{{ ip }}{% if uwhois_available %} (<a href="{{ url_for('whois', query=ip)}}">whois</a>){% endif %}
{% if 'ipasn' in hostnode.features and hostnode.ipasn.get(ip) %}- AS{{ hostnode.ipasn[ip]['asn'] }} {% if uwhois_available %} (<a href="{{ url_for('whois', query='AS'+hostnode.ipasn[ip]['asn'])}}">whois</a>){% endif %}{% endif %}
{% if 'cloudflare' in hostnode.features and hostnode.cloudflare.get(ip) %} - Known Cloudflare IP{% endif %}
</li>
{% endfor %}
{% for ip in hostnode.resolved_ips['v6'] %}
<li>
{{ ip }}{% if uwhois_available %} (<a href="{{ url_for('whois', query=ip)}}">whois</a>){% endif %}
{% if 'ipasn' in hostnode.features and hostnode.ipasn.get(ip) %}- AS{{ hostnode.ipasn[ip]['asn'] }} {% if uwhois_available %} (<a href="{{ url_for('whois', query='AS'+hostnode.ipasn[ip]['asn'])}}">whois</a>){% endif %}{% endif %}
{% if 'cloudflare' in hostnode.features and hostnode.cloudflare.get(ip) %} - Known Cloudflare IP{% endif %}
</li>
{% endfor %}
</ul>
Expand Down

0 comments on commit ecb4623

Please sign in to comment.