-
Notifications
You must be signed in to change notification settings - Fork 1
/
__init__.py
513 lines (429 loc) · 20.8 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
# coding=utf-8
from __future__ import absolute_import, unicode_literals
from concurrent.futures import ThreadPoolExecutor
from octoprint.printer.estimation import PrintTimeEstimator
import octoprint.plugin
import octoprint.events
import octoprint.filemanager.storage
import re
import sarge
import io
import time
import os
import sys
from octoprint.filemanager.analysis import AnalysisAborted
from octoprint.filemanager.analysis import GcodeAnalysisQueue
from octoprint.printer.estimation import PrintTimeEstimator
class SlicerEstimator(PrintTimeEstimator):
def __init__(self, job_type):
PrintTimeEstimator.__init__(self, job_type)
self._job_type = job_type
self.estimated_time = -1
self.average_prio = False
def estimate(self, progress, printTime, cleanedPrintTime, statisticalTotalPrintTime, statisticalTotalPrintTimeType):
std_estimator = PrintTimeEstimator.estimate(self, progress, printTime, cleanedPrintTime, statisticalTotalPrintTime, statisticalTotalPrintTimeType)
if self._job_type != "local" or self.estimated_time == -1:
# using standard estimator
return std_estimator
elif std_estimator[1] == "average" and self.average_prio:
# average more important than estimation
return std_estimator
else:
# return "slicerestimator" as Origin of estimation
return self.estimated_time, "slicerestimator"
class SlicerEstimatorPlugin(octoprint.plugin.StartupPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.EventHandlerPlugin,
octoprint.plugin.ProgressPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.SimpleApiPlugin):
def __init__(self):
self._estimator = None
self._slicer_estimation = None
self._executor = ThreadPoolExecutor()
# Slicer defaults - actual Cura M117, PrusaSlicer, Cura, Simplify3D
self._slicer_def = [
["M117","","",
"M117 Time Left ([0-9]+)h([0-9]+)m([0-9]+)s",
"M117 Time Left ([0-9]+)h([0-9]+)m([0-9]+)s",
"M117 Time Left ([0-9]+)h([0-9]+)m([0-9]+)s",
1,1,1,2,3,"GCODE","M117 Time Left ([0-9]+)h([0-9]+)m([0-9]+)s"],
["M73","","",
"",
"M73 P([0-9]+) R([0-9]+).*",
"",
1,1,1,2,1,"GCODE","M73 P([0-9]+) R([0-9]+).*"],
["","","",
"",
"",
";TIME:([0-9]+)",
1,1,1,1,1,"COMMENT",";TIME:([0-9]+)"],
["","","",
"; Build time: ([0-9]+) hours? ([0-9]+) minutes",
"; Build time: ([0-9]+) hours? ([0-9]+) minutes",
"",
1,1,1,2,1,"COMMENT","; Build time: ([0-9]+) hours? ([0-9]+) minutes"]]
# SECTION: Settings
def on_after_startup(self):
self._logger.info("Started up SlicerEstimator")
self._update_settings_from_config()
def get_settings_defaults(self):
return dict(slicer="2",
slicer_gcode="M117",
pw="",
pd="",
ph="M117 Time Left ([0-9]+)h([0-9]+)m([0-9]+)s",
pm="M117 Time Left ([0-9]+)h([0-9]+)m([0-9]+)s",
ps="M117 Time Left ([0-9]+)h([0-9]+)m([0-9]+)s",
pwp=1,
pdp=1,
php=1,
pmp=2,
psp=3,
search_mode="GCODE",
search_pattern="",
average_prio=False,
use_assets=True,
slicer_auto=True,
estimate_upload=True,
add_slicer_metadata=True,
useDevChannel=False)
def on_settings_save(self, data):
octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
self._update_settings_from_config()
def get_template_configs(self):
return [
dict(type="settings", custom_bindings=False)
]
# SECTION: Settings helper
def _update_settings_from_config(self):
self._slicer_conf = self._settings.get(["slicer"])
self._logger.debug("SlicerEstimator: Slicer Setting {}".format(self._slicer_conf))
self._slicer_auto = self._settings.get(["slicer_auto"])
self._average_prio = self._settings.get(["average_prio"])
self.estimate_upload = self._settings.get(["estimate_upload"])
self._add_slicer_metadata = self._settings.get(["add_slicer_metadata"])
self._useDevChannel = self._settings.get(["useDevChannel"])
if self._estimator != None:
self._estimator.average_prio = self._average_prio
self._logger.debug("Average: {}".format(self._average_prio))
if self._slicer_conf == "c":
self._slicer = self._slicer_conf
self._slicer_gcode = self._settings.get(["slicer_gcode"])
self._pw = re.compile(self._settings.get(["pw"]))
self._pd = re.compile(self._settings.get(["pd"]))
self._ph = re.compile(self._settings.get(["ph"]))
self._pm = re.compile(self._settings.get(["pm"]))
self._ps = re.compile(self._settings.get(["ps"]))
self._pwp = int(self._settings.get(["pwp"]))
self._pdp = int(self._settings.get(["pdp"]))
self._php = int(self._settings.get(["php"]))
self._pmp = int(self._settings.get(["pmp"]))
self._psp = int(self._settings.get(["psp"]))
self._search_mode = self._settings.get(["search_mode"])
self._search_pattern = self._settings.get(["search_pattern"])
else:
self._set_slicer_settings(int(self._slicer_conf))
def _set_slicer_settings(self, slicer):
self._slicer = slicer
self._slicer_gcode = self._slicer_def[int(slicer)][0]
self._pw = re.compile(self._slicer_def[int(slicer)][1])
self._pd = re.compile(self._slicer_def[int(slicer)][2])
self._ph = re.compile(self._slicer_def[int(slicer)][3])
self._pm = re.compile(self._slicer_def[int(slicer)][4])
self._ps = re.compile(self._slicer_def[int(slicer)][5])
self._pwp = self._slicer_def[int(slicer)][6]
self._pdp = self._slicer_def[int(slicer)][7]
self._php = self._slicer_def[int(slicer)][8]
self._pmp = self._slicer_def[int(slicer)][9]
self._psp = self._slicer_def[int(slicer)][10]
self._search_mode = self._slicer_def[int(slicer)][11]
self._search_pattern = self._slicer_def[int(slicer)][12]
# SECTION: Estimation
def updateGcodeEstimation(self, comm_instance, phase, cmd, cmd_type, gcode, *args, **kwargs):
if self._estimator is None:
return
if self._search_mode == "GCODE" and gcode and gcode == self._slicer_gcode:
self._logger.debug("SlicerEstimator: {} found - {}".format(gcode,cmd))
estimated_time = self._parseEstimation(cmd)
if estimated_time:
self._estimator.estimated_time = estimated_time
else:
return
# calculate estimation on print progress
def on_print_progress(self, storage, path, progress):
if self._search_mode == "COMMENT":
if self._slicer_estimation:
self._estimator.estimated_time = self._slicer_estimation - (self._slicer_estimation * progress * 0.01)
self._logger.debug("SlicerEstimator: {}sec".format(self._estimator.estimated_time))
# estimator factory hook
def estimator_factory(self):
def factory(*args, **kwargs):
self._estimator = SlicerEstimator(*args, **kwargs)
self._estimator.average_prio = self._average_prio
return self._estimator
return factory
# EventHandlerPlugin for native information search
def on_event(self, event, payload):
if event == octoprint.events.Events.PRINT_STARTED:
if payload["origin"] == "local":
self._set_slicer(payload["origin"], payload["path"])
if self._search_mode == "COMMENT":
self._logger.debug("Search started in file {}".format(payload["path"]))
self._executor.submit(
self._search_slicer_comment_file, payload["origin"], payload["path"]
)
if event == octoprint.events.Events.PRINT_CANCELLED or event == octoprint.events.Events.PRINT_FAILED or event == octoprint.events.Events.PRINT_DONE:
# Init of Class variables for new estimation
self._slicer_estimation = None
self._sliver_estimation_str = None
self._estimator.estimated_time = -1
self._logger.debug("Event received: {}".format(event))
if event == octoprint.events.Events.FILE_ADDED and self._add_slicer_metadata:
if payload["storage"] == "local" and payload["type"][1] == "gcode":
self._logger.debug("File uploaded and will be scanned for Metadata")
self._find_metadata(payload["storage"], payload["path"])
# SECTION: File metadata
# search for material data
def _find_metadata(self, origin, path):
# Format: ;Slicer info:<key>;<Displayname>;<Value>
results = self._search_in_file_start_all(origin, path, ";Slicer info:", 5000)
if results is not None:
filament = dict()
for result in results:
slicer_info = result.lstrip(";Slicer info:").split(";")
filament[slicer_info[0]] = [slicer_info[1].strip(), slicer_info[2].strip()]
self._file_manager._storage_managers[origin].set_additional_metadata(path, "slicer", filament, overwrite=True)
self._logger.debug(self._file_manager._storage_managers[origin].get_additional_metadata(path,"slicer"))
# SECTION: Estimation helper
# set the slicer before starting the print, fallback to config if fails
def _set_slicer(self, origin, path):
if self._slicer_auto:
slicer_detected = self._detect_slicer(origin, path)
if slicer_detected:
self._set_slicer_settings(slicer_detected)
else:
self._set_slicer_settings(self._slicer_conf)
# slicer auto selection
def _detect_slicer(self, origin, path):
line = self._search_in_file_regex(origin, path,".*(PrusaSlicer|Simplify3D|Cura_SteamEngine).*")
if line:
if "Cura_SteamEngine" in line:
self._logger.info("Detected Cura")
return 2
elif "PrusaSlicer" in line:
self._logger.info("Detected PrusaSlicer")
return 1
elif "Simplify3D" in line:
self._logger.info("Detected Simplify3D")
return 3
else:
self._logger.warning("Autoselection of slicer not successful!")
def _parseEstimation(self,cmd):
if self._pw.pattern != "":
mw = self._pw.match(cmd)
else:
mw = None
if self._pd.pattern != "":
md = self._pd.match(cmd)
else:
md = None
if self._ph.pattern != "":
mh = self._ph.match(cmd)
else:
mh = None
if self._pm.pattern != "":
mm = self._pm.match(cmd)
else:
mm = None
if self._ps.pattern != "":
ms = self._ps.match(cmd)
else:
ms = None
if mw or md or mh or mm or ms:
if mw:
weeks = float(mw.group(self._pwp))
else:
weeks = 0
if md:
days = float(md.group(self._pdp))
else:
days = 0
if mh:
hours = float(mh.group(self._php))
else:
hours = 0
if mm:
minutes = float(mm.group(self._pmp))
else:
minutes = 0
if ms:
seconds = float(ms.group(self._psp))
else:
seconds = 0
self._logger.debug("SlicerEstimator: Weeks {}, Days {}, Hours {}, Minutes {}, Seconds {}".format(weeks, days, hours, minutes, seconds))
estimated_time = weeks*7*24*60*60 + days*24*60*60 + hours*60*60 + minutes*60 + seconds
self._logger.debug("SlicerEstimator: {}sec".format(estimated_time))
return estimated_time
else:
self._logger.debug("SlicerEstimator: unknown cmd {}".format(cmd))
# file search slicer comment
def _search_slicer_comment_file(self, origin, path):
self._slicer_estimation = None
slicer_estimation_str = self._search_in_file_regex(origin, path, self._search_pattern)
if slicer_estimation_str:
self._logger.debug("Slicer-Comment {} found.".format(slicer_estimation_str))
self._slicer_estimation = self._parseEstimation(slicer_estimation_str)
self._estimator.estimated_time = self._slicer_estimation
else:
self._logger.warning("Slicer-Comment not found. Please check if you selected the correct slicer.")
# generic file search with RegEx
def _search_in_file_regex(self, origin, path, pattern, rows = 0):
path_on_disk = self._file_manager.path_on_disk(origin, path)
self._logger.debug("Path on disc searched: {}".format(path_on_disk))
compiled = re.compile(pattern)
steps = rows
with io.open(path_on_disk, mode="r", encoding="utf8", errors="replace") as f:
for line in f:
if compiled.match(line):
return line
if rows > 0:
steps -= 1
if steps <= 0:
break
# generic file search and find all occurences beginning with
def _search_in_file_start_all(self, origin, path, pattern, rows = 0):
path_on_disk = self._file_manager.path_on_disk(origin, path)
self._logger.debug("Path on disc searched: {}".format(path_on_disk))
steps = rows
return_arr = []
with io.open(path_on_disk, mode="r", encoding="utf8", errors="replace") as f:
for line in f:
if line[:len(pattern)] == pattern:
return_arr.append(line)
if rows > 0:
steps -= 1
if steps <= 0:
return return_arr
return return_arr
# SECTION: Analysis Queue Estimation (file upload)
def analysis_queue_factory(self, *args, **kwargs):
return dict(gcode=lambda finished_callback: SlicerEstimatorGcodeAnalysisQueue(finished_callback, self))
def run_analysis(self, path):
self._set_slicer("local", path)
self._logger.debug("Search started in file {}".format(path))
slicer_estimation_str = self._search_in_file_regex("local", path, self._search_pattern)
if slicer_estimation_str:
self._logger.debug("Slicer-Estimation {} found.".format(slicer_estimation_str))
return self._parseEstimation(slicer_estimation_str)
else:
self._logger.warning("Slicer-Estimation not found. Please check if you selected the correct slicer.")
# SECTION: API
# def get_api_commands(self):
# return dict(get_filament_data = [])
# def on_api_command(self, command, data):
# import flask
# import json
# from octoprint.server import user_permission
# if not user_permission.can():
# return flask.make_response("Insufficient rights", 403)
# if command == "get_filament_data":
# FileList = self._file_manager.list_files(recursive=True)
# self._logger.debug(FileList)
# localfiles = FileList["local"]
# results = filament_key = dict()
# for key, file in localfiles.items():
# if localfiles[key]["type"] == 'machinecode':
# filament_meta = self._file_manager._storage_managers['local'].get_additional_metadata(localfiles[key]["path"] ,"filament")
# results[localfiles[key]["path"]] = filament_meta
# return flask.jsonify(results)
# SECTION: Assets
def get_assets(self):
# Define your plugin's asset files to automatically include in the
# core UI here.
self._logger.debug("Assets registered")
return dict(
js=["js/SlicerEstimator.js"],
css=["css/SlicerEstimator.css"],
less=["less/SlicerEstimator.less"]
)
# SECTION: software update hook
def get_update_information(self):
if self._settings.get_boolean(["useDevChannel"]):
return dict(
SlicerEstimator=dict(
displayName=self._plugin_name + " (Development Branch)",
displayVersion=self._plugin_version,
# version check: github repository
type="github_commit",
user="NilsRo",
repo="OctoPrint-SlicerEstimator",
branch="Development",
# update method: pip
# pip="https://github.com/NilsRo/OctoPrint-SlicerEstimator/archive/{target_version}.zip"
method="update_script",
update_script="{python} -m pip --disable-pip-version-check install https://github.com/NilsRo/OctoPrint-SlicerEstimator/archive/refs/heads/Development.zip --force-reinstall --no-deps --no-cache-dir",
checkout_folder = os.path.dirname(os.path.realpath(sys.executable)),
restart = "octoprint"
)
)
else:
return dict(
SlicerEstimator=dict(
displayName=self._plugin_name,
displayVersion=self._plugin_version,
# version check: github repository
type="github_release",
user="NilsRo",
repo="OctoPrint-SlicerEstimator",
current=self._plugin_version,
# stable release
stable_branch=dict(
name="Stable",
branch="master",
comittish=["master"]
),
# update method: pip
pip="https://github.com/NilsRo/OctoPrint-SlicerEstimator/archive/{target_version}.zip"
)
)
# SECTION: Analysis Queue Class
class SlicerEstimatorGcodeAnalysisQueue(GcodeAnalysisQueue):
def __init__(self, finished_callback, plugin):
super(SlicerEstimatorGcodeAnalysisQueue, self).__init__(finished_callback)
self._plugin = plugin
self._result_slicer = None
def _do_analysis(self, high_priority=False):
try: # run a standard analysis and update estimation if found in GCODE
result = super(SlicerEstimatorGcodeAnalysisQueue, self)._do_analysis(high_priority)
if self._plugin.estimate_upload and not self._aborted:
future = self._plugin._executor.submit(
self._run_analysis, self._current.path
)
# Break analysis of abort requested
while not future.done() and not self._aborted:
time.sleep(1)
if future.done() and self._result_slicer:
self._logger.info("Found {}s from slicer for file {}".format(self._result_slicer, self._current.name))
result["estimatedPrintTime"] = self._result_slicer
elif not future.done() and self._aborted:
future.shutdown(wait=False)
raise AnalysisAborted(reenqueue=self._reenqueue)
return result
except AnalysisAborted as _:
self._logger.info("Probably starting printing, aborting analysis of file-upload.")
raise
def _do_abort(self, reenqueue=True):
super(SlicerEstimatorGcodeAnalysisQueue, self)._do_abort(reenqueue)
def _run_analysis(self, path):
self._result_slicer = self._plugin.run_analysis(path)
__plugin_name__ = "Slicer Print Time Estimator"
__plugin_pythoncompat__ = ">=2.7,<4" # python 2 and 3
__plugin_implementation__ = SlicerEstimatorPlugin()
__plugin_hooks__ = {
"octoprint.comm.protocol.gcode.sent": __plugin_implementation__.updateGcodeEstimation,
"octoprint.printer.estimation.factory": __plugin_implementation__.estimator_factory,
"octoprint.filemanager.analysis.factory": __plugin_implementation__.analysis_queue_factory,
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information
}