-
Notifications
You must be signed in to change notification settings - Fork 0
/
KibanaBackup.py
210 lines (169 loc) · 6.46 KB
/
KibanaBackup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
Backup Kibana spaces and objects using the Kibana api
usage:
KibanaBackup.py --config=<filename>
"""
import json
import requests
import os, sys
import configparser
from docopt import docopt
import hashlib
import urllib3
urllib3.disable_warnings()
def GetKibanaAPI (config, url_api ):
headers = {'Content-Type': 'application/json',}
if config['tls']:
method = 'https://'
else:
method = 'http://'
if config['auth']:
authentication = config['username'] + ":" + config['password'] + '@'
else:
authentication = ''
try:
url = method + authentication + config['server'] + ":" + config['port'] + '/' + url_api
if config['tls']:
r = requests.get (url, headers=headers, verify=False)
else:
r = requests.get (url, headers=headers )
message = json.loads(r.text)
if r.status_code != 200:
raise
return message
except:
print ("Failed to get kibana api")
print (r.text)
return ""
def GetSpaceObject(config, space, Object, default):
headers = {'Content-Type': 'application/json', 'kbn-xsrf': 'true' }
if config['tls']:
method = 'https://'
else:
method = 'http://'
if config['auth']:
authentication = config['username'] + ":" + config['password'] + '@'
else:
authentication = ''
#There is different api call for the default space
if default:
api_query = '/api/saved_objects/_export'
else:
api_query = '/s/' + space + '/api/saved_objects/_export'
url = method + authentication + config['server'] + ":" + config['port'] + api_query
post_data = json.dumps({ 'type': Object })
try:
if config['tls']:
print ("not verifying the tls cert!! - feature not available yet")
r = requests.post (url, headers=headers, data=post_data, verify=False)
else:
r = requests.post (url, headers=headers, data=post_data )
#need to save export.ndjson file
if r.status_code == 200:
if r.headers['content-disposition']:
export = r.content
return export
except:
print ("Failed to export space objects")
return ""
def CalcChecksumFile(filename):
try:
BLOCKSIZE = 65536
hasher = hashlib.sha1()
with open(filename, 'rb') as afile:
buf = afile.read(BLOCKSIZE)
while len(buf) > 0:
hasher.update(buf)
buf = afile.read(BLOCKSIZE)
file_sha1 = hasher.hexdigest()
filesize = os.path.getsize(filename)
return file_sha1, filesize
except:
print ("Failed to calculate sha1 or filesize")
sys.exit(1)
def WriteFileJSON(config, FileName, message):
FilePath = config['backup_folder'] +'/' + FileName
with open(FilePath, 'w') as outfile:
json.dump(message, outfile)
#Write ndjson file from byte array
def WriteFileObject(config, FileName, message):
FilePath = config['backup_folder'] +'/' + FileName
outfile = open(FilePath, 'wb')
outfile.write(message)
def GetSpaceObjects(config, space, SeperateObjectTypes):
#which types of saved objects are exported - check this after Kibana is upgraded
SavedObjectTypes = ['config','url','index-pattern','query','visualization','canvas-element','canvas-workpad','graph-workspace','dashboard','map','search','tag','map','lens','infrastructure-ui-source','metrics-explorer-view','inventory-view','apm-indices']
#Export all objects in a space
if space == 'default': #default space has a different api url
message = GetSpaceObject(config, space, SavedObjectTypes, True)
else:
message = GetSpaceObject(config, space, SavedObjectTypes, False)
FileName = "SavedObjects_" + space + ".ndjson"
WriteFileObject(config, FileName, message)
#Export each object as a seperate file
if SeperateObjectTypes:
for Object in SavedObjectTypes:
print ("Exporting Object : %s in space : %s" % (Object, space))
if space == 'default': #default space has a different api url
message = GetSpaceObject(config, space, Object, True)
else:
message = GetSpaceObject(config, space, Object, False)
FileName = "SavedObjects_" + space + "_" + Object + ".ndjson"
WriteFileObject(config, FileName, message)
def GetSpaces(config, SeperateObjectTypes):
spaces_json = GetKibanaAPI(config, "api/spaces/space")
FileName = 'kibana_spaces.json'
WriteFileJSON(config, FileName, spaces_json)
for space in spaces_json:
print ("Export Objects for Space : %s" % space['id'])
GetSpaceObjects(config, space['id'], SeperateObjectTypes)
def GetRoles(config):
roles_json = GetKibanaAPI(config, "api/security/role")
FileName = 'kibana_roles.json'
WriteFileJSON(config, FileName, roles_json)
#load config
def LoadConfig(ConfigFile):
try:
if os.path.isfile(ConfigFile):
config = configparser.ConfigParser()
config.read(ConfigFile)
if 'KibanaBackup' in config._sections:
config_dict = {s:dict(config.items(s)) for s in config.sections()}
RequiredConfig = ['server', 'port', 'backup_folder', 'tls', 'auth']
for item in RequiredConfig:
if item not in config_dict['KibanaBackup']:
print ("Unable to verify configuration file")
print ("Missing %s from configuration file" % item)
return None
#convert tls True/False string to bool
if config_dict['KibanaBackup']['tls'] == 'True':
config_dict['KibanaBackup']['tls'] = True
else:
config_dict['KibanaBackup']['tls'] = False
if config_dict['KibanaBackup']['auth'] == 'True':
config_dict['KibanaBackup']['auth'] = True
else:
config_dict['KibanaBackup']['auth'] = False
return config_dict['KibanaBackup']
return None
except:
return None
def main():
#which types of saved objects are exported
SavedObjectTypes = ['config','url','index-pattern','query','visualization','canvas-element','canvas-workpad','graph-workspace','dashboard','map','search','tag','map','lens','infrastructure-ui-source','metrics-explorer-view','inventory-view','apm-indices']
options = docopt(__doc__)
if options['--config']:
ConfigFile=options['--config']
config = LoadConfig(ConfigFile)
if config is None:
print ("Failed to load config")
return
if not os.path.exists(config['backup_folder']):
os.mkdir(config['backup_folder'])
if os.path.exists(config['backup_folder']):
GetRoles(config)
#set to true if you want to export a seperate file for each object type in a space
SeperateObjectTypes = False
GetSpaces(config, SeperateObjectTypes)
if __name__ == "__main__":
main()