-
Notifications
You must be signed in to change notification settings - Fork 0
/
classes.py
205 lines (168 loc) · 6.61 KB
/
classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import datetime
import logging
import zlib
from http import HTTPStatus
from typing import Generator, Literal
import requests
from minio import Minio
Proto = Literal["http", "https"]
Url = str
class Archive:
def __init__(
self,
host: str = None,
port: int = None,
access_key: str = None,
secret_key: str = None,
compliancy_bucket: str = None,
ssl_verify: bool = None,
):
self.log = logging.getLogger("Minio")
self.log.setLevel(logging.INFO)
self.host = host if host is not None else "localhost"
self.port = port if port is not None else "9000"
self.access_key = access_key if access_key is not None else "admin"
self.secret_key = secret_key if secret_key is not None else "?"
self.compliancy_bucket = (
compliancy_bucket if compliancy_bucket is not None else "compliancy-bucket"
)
self.ssl_verify = ssl_verify if ssl_verify is not None else False
self.client = Minio(
endpoint=f"{self.host}:{self.port}",
access_key=self.access_key,
secret_key=self.secret_key,
secure=True,
cert_check=self.ssl_verify,
)
def __repr__(self) -> str:
return f"{self.__class__.__name__}: {self.host} {self.access_key} \
{self.secret_key} {self.compliancy_bucket}"
def bucket_prefix(self, thisday: datetime, sourcetype: str = None) -> str:
prefix = f"year={thisday.year:0{4}}"
prefix += f"/month={thisday.month:0{2}}"
prefix += f"/day={thisday.day:0{2}}/"
if sourcetype is not None:
prefix += f"sourcetype={sourcetype}/"
self.log.debug(f"Bucket prefix {prefix}")
return prefix
def list_objects(
self, thisday: datetime, sourcetype: str = None, recurse: bool = None
) -> Generator:
# Recurse by default in case sourcetype is ommitted here
# but present in the archived object
recurse = recurse if recurse is not None else True
objects = self.client.list_objects(
bucket_name=self.compliancy_bucket,
prefix=self.bucket_prefix(thisday, sourcetype),
recursive=recurse,
)
return objects
def get_lines(self, object_name: str) -> bytes:
object = self.client.get_object(
bucket_name=self.compliancy_bucket, object_name=object_name
)
# Read data from object.
gzip_content = object.read(decode_content=True)
# Decode .gz
# https://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
lines = zlib.decompress(gzip_content, 15 + 32)
return lines
@property
def url(self) -> Url:
return f"https://{self.host}:{self.port}"
@property
def check_connectivity(self) -> bool:
requests.packages.urllib3.disable_warnings()
self.log.info(f"Checking Archive Server {self.url} reachability.")
minio_reachable = False
try:
response = requests.post(
url=self.url,
headers=dict(),
data=dict(),
verify=False,
)
minio_reachable = True
self.log.debug(f"Status: {response.status_code}")
except Exception:
self.log.warning(f"Archive host {self.url} not reachable!")
return minio_reachable
@property
def check_compliancy_bucket(self) -> bool:
self.log.info(f"Checking availability of {self.compliancy_bucket} in archive")
buckets = self.client.list_buckets()
compliancy_bucket_exists = self.compliancy_bucket in buckets
return compliancy_bucket_exists
class Destination:
def __init__(
self,
host: str = None,
port: int = None,
token: str = None,
proto: Proto = None,
ssl_verify: bool = None,
):
self.log = logging.getLogger("HEC")
self.log.setLevel(logging.INFO)
self.host = host if host is not None else "localhost"
self.port = port if port is not None else "8088"
self.token = token if token is not None else "aa-bb"
self.proto = proto if proto is not None else "https"
self.ssl_verify = ssl_verify if ssl_verify is not None else False
def __repr__(self) -> str:
return f"{self.__class__.__name__}: {self.host} {self.token}"
@property
def url(self) -> Url:
return f"{self.proto}://{self.host}:{self.port}/services/collector/event"
@property
def headers(self) -> dict:
return {
"Authorization": "Splunk " + self.token,
"Content-Type": "application/json",
}
@property
def check_connectivity(self) -> bool:
requests.packages.urllib3.disable_warnings()
self.log.info("Checking HEC Server URI reachability.")
hec_reachable = False
# acceptable_status_codes = [400, 401, 403]
# heath_warning_status_codes = [500, 503]
try:
response = requests.post(
url=self.url,
headers=self.headers,
data=dict(),
verify=False,
)
hec_reachable = True
self.log.debug(f"Status: {response.status_code}")
except Exception:
self.log.error(f"Splunk Server {self.host} is unreachable.")
return hec_reachable
def sendMultiLines(self, payload: str) -> HTTPStatus:
requests.packages.urllib3.disable_warnings()
status = HTTPStatus.SERVICE_UNAVAILABLE
try:
response = requests.post(
url=self.url,
headers=self.headers,
data=payload,
verify=self.ssl_verify,
)
status = response.status_code
self.log.debug(f"Status: {response.status_code}")
except Exception:
logging.error(f"Connection to {self.host} refused!")
return status
if __name__ == "__main__":
from config import destination
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
payload = """{"time":1701388800,"event":"Current Time = 01:00:00\\n",
"host":"uf1","source":"/opt/splunkforwarder/etc/apps/normal/bin/heure.py",
"sourcetype":"heure","index":"cust1","fields":{"cust":"customer-normal"}}\n
{"time":1701388800,"event":"Current Time = 01:00:00\\n","host":"uf0",
"source":"/opt/splunkforwarder/etc/apps/unlimited-speed/bin/heure.py",
"sourcetype":"heure","index":"cust0","fields":{"cust":"customer-unlimited"}}\n"""
status = destination.sendMultiLines(payload)
logging.debug(f"Archive sent status: {status}")