-
Notifications
You must be signed in to change notification settings - Fork 0
/
hlbmpwatcher
executable file
·251 lines (218 loc) · 9.55 KB
/
hlbmpwatcher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#!/usr/bin/python3
import argparse
import logging
import os
import re
import sys
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from PIL import Image
import concurrent.futures
if sys.platform == 'win32':
import winreg
def read_reg_str(ep, p = r"", k = ''):
try:
key = winreg.OpenKeyEx(ep, p)
value = winreg.QueryValueEx(key,k)
if key:
winreg.CloseKey(key)
return str(value[0])
except Exception as e:
return None
return None
def get_main_steam_path():
if sys.platform == 'win32':
steam_path = read_reg_str(ep = winreg.HKEY_LOCAL_MACHINE, p = r"SOFTWARE\Wow6432Node\Valve\Steam", k = 'InstallPath')
if steam_path is None or not os.path.exists(steam_path):
steam_path = read_reg_str(ep = winreg.HKEY_LOCAL_MACHINE, p = r"SOFTWARE\Valve\Steam", k = 'InstallPath')
return steam_path
else:
home_directory = os.path.expanduser('~')
steam_path = os.path.join( home_directory, '.steam', 'steam')
return steam_path
return None
def get_halflife_path(steam_storage_paths):
for storage_path in steam_storage_paths:
apps_path = os.path.join(storage_path, 'steamapps', 'common')
hl_path = os.path.join(apps_path, 'Half-Life')
if os.path.isdir(hl_path):
return hl_path
return None
def get_svencoop_path(steam_storage_paths):
for storage_path in steam_storage_paths:
apps_path = os.path.join(storage_path, 'steamapps', 'common')
sven_path = os.path.join(apps_path, 'Sven Co-op')
if os.path.isdir(sven_path):
return sven_path
return None
def screenshot_folders_from_storage_paths(steam_storage_paths):
paths = []
hl_path = get_halflife_path(steam_storage_paths)
if hl_path is not None:
for subitem in os.listdir(hl_path):
possible_mod_path = os.path.join(hl_path, subitem)
if os.path.isdir(possible_mod_path):
liblist_path = os.path.join(possible_mod_path, 'liblist.gam')
if os.path.isfile(liblist_path):
paths.append(possible_mod_path)
sven_path = get_svencoop_path(steam_storage_paths)
if sven_path is not None:
sven_screenshots_path = os.path.join(sven_path, 'svencoop', 'screenshots')
if os.path.isdir(sven_screenshots_path):
paths.append(sven_screenshots_path)
return paths
def get_steam_storage_paths(main_steam_path):
storage_paths = []
libraryfolders_path = os.path.join(main_steam_path, 'steamapps', 'libraryfolders.vdf')
try:
with open(libraryfolders_path, 'r') as libraryfolders_file:
regex = re.compile(r'\s*"path"\s*"(.+)"')
for line in libraryfolders_file.readlines():
result = regex.match(line)
if result:
storage_path = result.group(1)
if os.path.isdir(storage_path):
storage_paths.append(os.path.normpath(storage_path))
except Error:
pass
if len(storage_paths) == 0:
storage_paths.append(main_steam_path)
return storage_paths
def convert_to_png(img_path):
# error thrown if bmp file is not fully written yet
truncated_error_regex = re.compile('image file is truncated')
basepath, ext = os.path.splitext(img_path)
try:
with Image.open(img_path) as im:
png_path = basepath + '.png'
if not os.path.exists(png_path):
im.save(png_path)
logging.info("Converted '%s' to '%s' in '%s'", os.path.basename(img_path), os.path.basename(png_path), os.path.dirname(img_path))
except OSError as err:
errstr = str(err)
if truncated_error_regex.search(errstr) is None:
logging.error("Cannot convert '%s' to PNG: %s", img_path, str(err))
class ScreenshotHandler(FileSystemEventHandler):
@staticmethod
def on_modified(event):
if event.is_directory:
return None
elif event.event_type == 'modified' and event.src_path.endswith('.bmp'):
img_path = event.src_path
convert_to_png(img_path)
class ModfolderHandler(FileSystemEventHandler):
def __init__(self, observer, modwatches):
super().__init__()
self.observer = observer
self.modwatches = modwatches
def on_created(self, event):
if not event.is_directory:
return None
logging.info("Starting observing new directory: %s", event.src_path)
modwatches[event.src_path] = observer.schedule(ScreenshotHandler(), event.src_path, recursive=False)
def on_deleted(self, event):
if not event.is_directory:
return None
if event.src_path in modwatches:
logging.info("Removing directory from the watch list because it was deleted: %s", event.src_path)
observer.unschedule(modwatches[event.src_path])
del modwatches[event.src_path]
def on_moved(self, event):
if not event.is_directory:
return None
if event.src_path in modwatches:
logging.info("The directory renamed from '%s' to '%s'. Updating the watch.", event.src_path, event.dest_path)
observer.unschedule(modwatches[event.src_path])
del modwatches[event.src_path]
modwatches[event.dest_path] = observer.schedule(ScreenshotHandler(), event.dest_path, recursive=False)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(message)s')
arg_parser = argparse.ArgumentParser(
prog='Half-Life BMP watcher',
description='Automatically convert newly taken Half-Life screenshots from BMP to PNG')
arg_parser.add_argument('extra_paths', type=str, nargs='*', help='additional paths to watch for BMP files')
arg_parser.add_argument('--no-autodetect', action='store_true', help="don't autodetect Steam Half-Life and Sven Co-op screenshot paths")
arg_parser.add_argument('--dry-run', action='store_true', help="don't start observation, don't convert images, just print observed paths")
arg_parser.add_argument('--no-watch', action='store_true', help="don't start observation, print observed paths and convert existing bmp files if --convert-existing option is provided")
arg_parser.add_argument('--convert-existing', action='store_true', help="convert already existing bmp screenshots at the hlbmpwatcher startup")
args = arg_parser.parse_args()
autodetected_paths = []
hl_path = None
main_steam_path = None
if not args.no_autodetect:
main_steam_path = get_main_steam_path()
if main_steam_path is None:
print("Couldn't detect steam main path")
elif not os.path.isdir(main_steam_path):
print("Directory '%s' doesn't exist" % main_steam_path)
else:
steam_storage_paths = get_steam_storage_paths(main_steam_path)
hl_path = get_halflife_path(steam_storage_paths)
autodetected_paths = screenshot_folders_from_storage_paths(steam_storage_paths)
screenshot_paths = autodetected_paths
extra_paths = []
for path in args.extra_paths:
if os.path.isdir(path):
extra_paths.append(path)
else:
print("'%s' is not an existing directory." % path)
screenshot_paths.extend(set(extra_paths))
if len(screenshot_paths) == 0:
print("Nothing to do. Directories are not provided or do not exists. Exiting...")
sys.exit()
should_observe = not (args.no_watch or args.dry_run)
if hl_path is not None:
if should_observe:
print("Observing Half-Life directory for mod folders:")
else:
print("Half-Life directory:")
print("\t%s" % hl_path)
if should_observe:
print("Observing screenshot directories:")
else:
print("Screenshot directories:")
for path in screenshot_paths:
print("\t%s" % path)
conversion_executor = None if args.dry_run else concurrent.futures.ThreadPoolExecutor(max_workers=4)
if args.convert_existing:
bmp_screenshot_re = re.compile(r"[0-9]{4}\.bmp")
for path in screenshot_paths:
for f in os.listdir(path):
img_path = os.path.join(path, f)
if bmp_screenshot_re.search(img_path) is not None:
if args.dry_run:
logging.info("Would convert '%s' to PNG", img_path)
else:
conversion_executor.submit(convert_to_png, img_path)
if conversion_executor:
conversion_executor.shutdown(wait=True)
if args.no_watch or args.dry_run:
sys.exit()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
force=True)
screenshot_handler = ScreenshotHandler()
observer = Observer()
modwatches = dict()
for path in screenshot_paths:
modwatches[path] = observer.schedule(screenshot_handler, path, recursive=False)
modfolder_handler = ModfolderHandler(observer, modwatches)
if hl_path is not None:
observer.schedule(modfolder_handler, hl_path, recursive=False)
observer.start()
try:
try:
while observer.is_alive():
observer.join(1)
except TypeError:
# support old versions
while True:
time.sleep(1)
except KeyboardInterrupt:
# just stop the loop
pass
finally:
observer.stop()
observer.join()