-
Notifications
You must be signed in to change notification settings - Fork 0
/
dns_traffic_analyzer.py
162 lines (132 loc) · 6.76 KB
/
dns_traffic_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import logging
import dpkt
from dpkt import dns
import socket
import requests as requests
import time
import logging.config
logging.config.fileConfig("logging_config.ini")
logger = logging.getLogger("DNSAnalyzer")
class DNSTrafficAnalyzer:
def __init__(self, pcap_file):
self.pcap_file = pcap_file
def analyze_dns_traffic(self):
try:
with open(self.pcap_file, 'rb') as pcap:
pcap_data = dpkt.pcap.Reader(pcap)
for timestamp, buf in pcap_data:
eth = dpkt.ethernet.Ethernet(buf)
# Check if the packet is an IP packet
if isinstance(eth.data, dpkt.ip.IP):
ip = eth.data
# Check if the IP packet is a UDP packet
if isinstance(ip.data, dpkt.udp.UDP):
udp = ip.data
# Check if the UDP packet is a DNS packet (port 53)
if udp.sport == 53 or udp.dport == 53:
dns = dpkt.dns.DNS(udp.data)
# Check for suspicious domain lookups
for query in dns.qd:
domain = query.name.lower().decode('utf-8')
if self.is_suspicious_domain(domain):
logger.warning(f'Suspicious domain lookup: {domain} from {ip.src} to {ip.dst}')
# Check for potential DNS tunneling
if self.is_dns_tunneling(dns):
logger.warning(f'Potential DNS tunneling detected from {ip.src} to {ip.dst}')
# Check for communication with known malicious domains
if self.is_malicious_communication(dns):
logger.warning(f'Communication with malicious domain detected from {ip.src} to {ip.dst}')
# Check for DNSSEC validation failures
if self.is_dnssec_validation_failure(dns):
logger.warning('DNSSEC validation failure detected')
# Check for DNS amplification attacks
if self.is_dns_amplification_attack(dns):
logger.warning('DNS amplification attack detected')
# Check for fast flux domains
if self.is_fast_flux_domain(dns):
logger.warning('Fast flux domain detected')
# Check for DNS rebinding
if self.is_dns_rebinding(dns):
logger.warning('DNS rebinding detected')
except FileNotFoundError:
logger.error(f"PCAP file '{self.pcap_file}' not found")
def is_suspicious_domain(self, domain):
# Check if the domain contains a suspicious keyword
suspicious_keywords = ['malware', 'phishing', 'botnet']
for keyword in suspicious_keywords:
if keyword in domain:
return True
return False
def is_dns_tunneling(self, dns):
# Check if the DNS query type is not A or AAAA
for query in dns.qd:
if query.type != dpkt.dns.DNS_A and query.type != dpkt.dns.DNS_AAAA:
return True
return False
def is_malicious_communication(self, dns):
# Fetch the latest 100 malicious IPs from VirusTotal
malicious_ips = self.fetch_malicious_ips()
for rr in dns.an:
if isinstance(rr, dpkt.dns.DNSRR):
ip_address = socket.inet_ntoa(rr.rdata)
if ip_address in malicious_ips:
return True
return False
def is_dnssec_validation_failure(self, dns):
# Check if the DNS response has DNSSEC validation failure flag set
if dns.rcode & dpkt.dns.DNS_RCODE_BADSIG:
return True
return False
def is_dns_amplification_attack(self, dns):
# Check if the DNS response is larger than the query
query_size = sum(len(qname) + 1 + 2 + 2 for qname in dns.qd[0].name.split(b'.'))
response_size = sum(len(rr.rname) + 2 + 2 + 4 + 2 + 2 + len(rr.rdata) for rr in dns.an)
if response_size > query_size:
return True
return False
def is_fast_flux_domain(self, dns):
# Check if the DNS response contains multiple IP addresses for the same domain
ip_addresses = set()
for rr in dns.an:
if isinstance(rr, dpkt.dns.DNSRR):
if rr.type in (dpkt.dns.DNS_A, dpkt.dns.DNS_AAAA):
ip_address = socket.inet_ntoa(rr.rdata)
if ip_address in ip_addresses:
return True
ip_addresses.add(ip_address)
return False
def is_dns_rebinding(self, dns):
# Check if the DNS response contains an IP address that does not match the query domain
query_domain = dns.qd[0].name.lower().decode('utf-8')
for rr in dns.an:
if isinstance(rr, dpkt.dns.DNSRR):
if rr.type in (dpkt.dns.DNS_A, dpkt.dns.DNS_AAAA):
ip_address = socket.inet_ntoa(rr.rdata)
if not self.ip_matches_domain(ip_address, query_domain):
return True
return False
def ip_matches_domain(self, ip_address, domain):
try:
reverse_ip = dns.reversename.from_address(ip_address)
rdns = str(dns.resolver.query(reverse_ip, "PTR")[0]).lower()
return domain in rdns
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
return False
except dns.resolver.Timeout:
logger.warning("DNS resolution timeout occurred")
return False
def fetch_malicious_ips(self):
api_key = "VIRUSTOTAL_API_KEY"
six_months_ago = int(time.time()) - (180 * 24 * 60 * 60) # 180 days in seconds
try:
url = f"https://www.virustotal.com/api/v3/ip_addresses?filter=last_analysis_date%3A>{six_months_ago}&limit=100"
headers = {"x-apikey": api_key}
response = requests.get(url, headers=headers)
response.raise_for_status()
json_data = response.json()
# Extract the malicious IP addresses
malicious_ips = [ip['attributes']['ip_address'] for ip in json_data['data']]
return malicious_ips
except requests.exceptions.RequestException as e:
logger.error(f"Error occurred while fetching malicious IPs from VirusTotal: {str(e)}")
return []