-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathparse_ddosia.py
More file actions
126 lines (104 loc) · 4.15 KB
/
parse_ddosia.py
File metadata and controls
126 lines (104 loc) · 4.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import urllib3
import sys
import json
import requests
import tldextract
from datetime import datetime
from pymisp import *
# Get events from MISP with the DDoSia configuration object.
# Extract unique hostnames and domains
# Optionally send to Mattermost
#
# Koen Van Impe - 2024
# Credentials
misp_url = "MISP"
misp_key = "KEY"
misp_verifycert = True
mattermost_hook = ""
teams_hook = ""
ddosia_file_output = "/var/www/MISP/app/webroot/misp-export/ddosia.txt"
# Output
target_hostnames = []
target_domains = []
# Send to Mattermost?
send_mattermost = False
# Send to Teams
send_teams = False
# Write to file
write_to_ddosia_file_output = False
# MISP organisation "witha.name"
query_org = "ae763844-03bf-4588-af75-932d5ed2df8c"
# Published?
published = True
# Limit for recent events
date_filter = "1d"
# Create PyMISP object and test connectivity
misp = PyMISP(misp_url, misp_key, misp_verifycert)
print(f"Extract hostnames from {misp_url}")
# Search for events
events = misp.search("events", pythonify=True, org=query_org, published=published, date=date_filter)
# Process events
if len(events) > 0:
print("Parsing {} events".format(len(events)))
for event in events:
print(" Event {} ({})".format(event.info, event.uuid))
for object in event.objects:
if object.name == "ddos-config":
for attribute in object.Attribute:
if attribute.type == "hostname":
check_value = attribute.value.lower().strip()
if check_value not in target_hostnames:
target_hostnames.append(check_value)
print(f" Found {check_value}")
extracted = tldextract.extract(check_value)
domain = '.'.join([extracted.domain, extracted.suffix])
if domain not in target_domains:
target_domains.append(domain)
if len(target_hostnames) > 0:
target_hostnames.sort()
target_domains.sort()
title = "DDoSia config: Parsed {} MISP events and found {} unique hostnames for {} domains - ({}, last {})".format(len(events), len(target_hostnames), len(target_domains), datetime.now().date(), date_filter)
summary = "Hostnames\n------------\n"
summary_md = "# Hostnames\n"
for t in target_hostnames:
summary += "\n{}".format(t)
summary_md += "\n- {}".format(t)
summary += "\n\nDomains\n----------\n"
summary_md += "\n\n# Domains\n"
for t in target_domains:
summary += "\n{}".format(t)
summary_md += "\n- {}".format(t)
summary_md += "\n"
if send_mattermost:
summary_md = title + summary_md + "\n"
message = {"username": "witha.name-reporters", "text": summary_md}
r = requests.post(mattermost_hook, data=json.dumps(message))
print(r, r.status_code, r.text)
if send_teams:
message = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.teams.card.o365connector",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "MessageCard",
"context": "https://schema.org/extensions",
"title": title,
"version": "1.0",
"sections": [
{
"text": summary_md
}
]
}
}
]
}
r = requests.post(teams_hook, json=message)
if write_to_ddosia_file_output:
summary = title + "\n\n" + summary + "\n"
with open(ddosia_file_output, 'w') as file:
file.write(summary)
else:
print("No events found.")