Permalink
Browse files

Safety Plugin: Add detection of Anycubic Mega

Identified by "Author: (Jolly, xxxxxxxx.CO.)" in startup messages.
  • Loading branch information...
foosel committed Apr 10, 2018
1 parent 85dfcbd commit 75de062b55c3aef7b250bf71c2ca5747860c3330
Showing with 34 additions and 10 deletions.
  1. +34 −10 src/octoprint/plugins/printer_safety_check/__init__.py
@@ -23,8 +23,14 @@
"""

ANETA8_FIRMWARE_NAME = "ANET_A8_".lower()
ANYCUBIC_AUTHOR = "| Author: (Jolly, xxxxxxxx.CO.)".lower()

SAFETY_CHECKS = {
"firmware-unsafe": (lambda name, data: name and name.lower().startswith("anet_a8_"),)
"firmware-unsafe": dict(m115=(lambda name, data: name and name.lower().startswith(ANETA8_FIRMWARE_NAME),),
received=(lambda line: line and ANYCUBIC_AUTHOR in line.lower(),),
message=u"Your printer's firmware is known to lack mandatory safety features (e.g. " \
u"thermal runaway protection). This is a fire risk.")
}

class PrinterSafetyCheckPlugin(octoprint.plugin.AssetPlugin,
@@ -35,6 +41,7 @@ class PrinterSafetyCheckPlugin(octoprint.plugin.AssetPlugin,
# noinspection PyMissingConstructor
def __init__(self):
self._warnings = dict()
self._scan_received = True

##~~ TemplatePlugin API

@@ -58,7 +65,8 @@ def get_assets(self):

def on_event(self, event, payload):
if event == Events.FIRMWARE_DATA:
self._analyze_firmware_data(payload.get("name"), payload.get("data"))
self._run_checks("m115", payload.get("name"), payload.get("data"))
self._scan_received = False
elif event == Events.DISCONNECTED:
self._reset_warnings()

@@ -69,23 +77,36 @@ def on_api_get(self, request):
return flask.make_response("Insufficient rights", 403)
return flask.jsonify(self._warnings)

##~~ GCODE received hook handler

def on_gcode_received(self, comm_instance, line, *args, **kwargs):
if self._scan_received:
self._run_checks("received", line)
return line

##~~ Helpers

def _analyze_firmware_data(self, name, data):
def _run_checks(self, check_type, *args, **kwargs):
changes = False

for warning_type, checks in SAFETY_CHECKS.items():
if any(x(name, data) for x in checks):
message = u"Your printer's firmware is known to lack mandatory safety features (e.g. " \
u"thermal runaway protection). This is a fire risk."
self._log_to_terminal(TERMINAL_SAFETY_WARNING.format(message="\n".join(textwrap.wrap(message, 75)),
warning_type=warning_type))
self._warnings[warning_type] = message
for warning_type, check_data in SAFETY_CHECKS.items():
checks = check_data.get(check_type)
message = check_data.get("message")
if not checks or not message:
continue

if any(x(*args, **kwargs) for x in checks):
self._register_warning(warning_type, message)
changes = True

if changes:
self._ping_clients()

def _register_warning(self, warning_type, message):
self._log_to_terminal(TERMINAL_SAFETY_WARNING.format(message="\n".join(textwrap.wrap(message, 75)),
warning_type=warning_type))
self._warnings[warning_type] = message

def _reset_warnings(self):
self._warnings.clear()
self._ping_clients()
@@ -107,4 +128,7 @@ def _ping_clients(self):
"issue and inform you about that fact.")
__plugin_license__ = "AGPLv3"
__plugin_implementation__ = PrinterSafetyCheckPlugin()
__plugin_hooks__ = {
"octoprint.comm.protocol.gcode.received": __plugin_implementation__.on_gcode_received
}

0 comments on commit 75de062

Please sign in to comment.