forked from Ape/samsungctl
-
Notifications
You must be signed in to change notification settings - Fork 7
/
remote_websocket.py
93 lines (73 loc) · 2.87 KB
/
remote_websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import base64
import json
import logging
import socket
import time
import ssl
import os
from . import exceptions
URL_FORMAT = "ws://{}:{}/api/v2/channels/samsung.remote.control?name={}"
SSL_URL_FORMAT = "wss://{}:{}/api/v2/channels/samsung.remote.control?name={}"
class RemoteWebsocket():
"""Object for remote control connection."""
def __init__(self, config):
import websocket
self.token_file = os.path.dirname(os.path.realpath(__file__)) + "/token.txt"
if not config["port"]:
config["port"] = 8001
if config["timeout"] == 0:
config["timeout"] = None
if config["port"] == 8002:
url = SSL_URL_FORMAT.format(config["host"], config["port"],
self._serialize_string(config["name"]))
if os.path.isfile(self.token_file):
with open(self.token_file, "r") as token_file:
url += "&token=" + token_file.readline()
self.connection = websocket.create_connection(url, config["timeout"], sslopt={"cert_reqs": ssl.CERT_NONE})
else:
url = URL_FORMAT.format(config["host"], config["port"],
self._serialize_string(config["name"]))
self.connection = websocket.create_connection(url, config["timeout"])
self._read_response()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
"""Close the connection."""
if self.connection:
self.connection.close()
self.connection = None
logging.debug("Connection closed.")
def control(self, key):
"""Send a control command."""
if not self.connection:
raise exceptions.ConnectionClosed()
payload = json.dumps({
"method": "ms.remote.control",
"params": {
"Cmd": "Click",
"DataOfCmd": key,
"Option": "false",
"TypeOfRemote": "SendRemoteKey"
}
})
logging.info("Sending control command: %s", key)
self.connection.send(payload)
time.sleep(self._key_interval)
_key_interval = 0.5
def _read_response(self):
response = self.connection.recv()
response = json.loads(response)
if 'data' in response and 'token' in response["data"]:
with open(self.token_file, "w") as token_file:
token_file.write(response['data']["token"])
if response["event"] != "ms.channel.connect":
self.close()
raise exceptions.UnhandledResponse(response)
logging.debug("Access granted.")
@staticmethod
def _serialize_string(string):
if isinstance(string, str):
string = str.encode(string)
return base64.b64encode(string).decode("utf-8")