/
custom_pipelines.py
79 lines (68 loc) · 2.17 KB
/
custom_pipelines.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import requests
from typing import Any
APIKEY = "<api key>"
CLUSTER_URL = "https://<cluster name>.es.us-central1.gcp.cloud.es.io/"
def send_post(uri: str, body: dict) -> dict:
headers = {
"Authorization" : f"ApiKey {APIKEY}"
}
url = f"{CLUSTER_URL}{uri}"
r = requests.post(url=url, headers=headers, data=body)
return r.json()
def send_put(uri: str, input: dict) -> dict:
headers = {
"Authorization" : f"ApiKey {APIKEY}",
"Content-Type" : "application/json"
}
url = f"{CLUSTER_URL}{uri}"
r = requests.put(url=url, headers=headers, json=input)
return r.json()
def send_get(uri: str) -> dict:
headers = {
"Authorization" : f"ApiKey {APIKEY}"
}
url = f"{CLUSTER_URL}{uri}"
r = requests.get(url=url, headers=headers)
return r.json()
def find_cutom_pipeline_processor(pipeline: dict[str, Any]) -> str:
if not "_meta" in pipeline.keys():
return ""
if not pipeline["_meta"]["managed_by"] == "fleet":
return ""
for p in pipeline["processors"]:
if not "pipeline" in p.keys():
continue
if p["pipeline"]["name"].endswith("@custom"):
return p["pipeline"]["name"]
return ""
def get_custom_pipelines() -> list:
uri = '_ingest/pipeline'
pipelines = []
pipelines_response = send_get(uri)
for name, pipeline in pipelines_response.items():
if name.startswith("logs-"):
custom_processor = find_cutom_pipeline_processor(pipeline)
if custom_processor:
pipelines.append(custom_processor)
return list(set(pipelines))
def create_pipeline(pipeline: str) -> dict:
uri = f"_ingest/pipeline/{pipeline}"
body = {
"processors": [
{
"pipeline": {
"name": "process_local_events",
"if": "ctx['data_stream']['namespace'] == 'local_malicious'"
}
}
]
}
result = send_put(uri, body)
return result
def main():
custom_pipelines = get_custom_pipelines()
for p in custom_pipelines:
print(p)
print(create_pipeline(p))
if __name__ == "__main__":
main()