-
Notifications
You must be signed in to change notification settings - Fork 7
/
custom_m_codes.py
executable file
·68 lines (55 loc) · 2.68 KB
/
custom_m_codes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3
"""
Example of intercepting and interacting with codes
Make sure when running this script to have access to the DSF UNIX socket owned by the dsf user.
"""
import subprocess
import traceback
from dsf.connections import InterceptConnection
from dsf.commands.basecommands import MessageType
from dsf.commands.code import CodeType
from dsf.initmessages.clientinitmessages import InterceptionMode
def start_intercept():
filters = ["M1234", "M5678", "M7722"]
intercept_connection = InterceptConnection(InterceptionMode.PRE, filters=filters, debug=True)
intercept_connection.connect()
try:
while True:
# Wait for a code to arrive
cde = intercept_connection.receive_code()
# Check for the type of the code
if cde.type == CodeType.MCode and cde.majorNumber == 1234:
# --------------- BEGIN FLUSH ---------------------
# Flushing is only necessary if the action below needs to be in sync with the machine
# at this point in the GCode stream. Otherwise it can an should be skipped
# Flush the code's channel to be sure we are being in sync with the machine
success = intercept_connection.flush(cde.channel)
# Flushing failed so we need to cancel our code
if not success:
print("Flush failed")
intercept_connection.cancel_code()
continue
# -------------- END FLUSH ------------------------
# Do whatever needs to be done if this is the right code
print(cde, cde.flags)
# Resolve it so that DCS knows we took care of it
intercept_connection.resolve_code()
elif cde.type == CodeType.MCode and cde.majorNumber == 5678:
intercept_connection.resolve_code()
intercept_connection.close()
# Exit this example
return
elif cde.type == CodeType.MCode and cde.majorNumber == 7722:
# We are going to shut down the SBC in one minute
subprocess.run(["sudo", "shutdown", "+1"])
# Resolve it with a custom response message text
intercept_connection.resolve_code(MessageType.Warn, "Shutting down SBC in 1min...")
else:
# We did not handle it so we ignore it and it will be continued to be processed
intercept_connection.ignore_code()
except Exception as e:
print("Closing connection: ", e)
traceback.print_exc()
intercept_connection.close()
if __name__ == "__main__":
start_intercept()