-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_add_dialog.py
139 lines (122 loc) · 5.09 KB
/
server_add_dialog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import logging
import time
import psutil
from gi.repository import Gtk, Adw, GObject
import socket
from socket import (
AF_INET,
AF_INET6,
SO_BINDTODEVICE,
SO_BROADCAST,
SOCK_DGRAM,
IPPROTO_UDP,
SOL_SOCKET,
)
from src import build_constants
from src.components.server_row import ServerRow
from src.reactive_set import ReactiveSet
from src.server import Server
@Gtk.Template(resource_path=build_constants.PREFIX + "/templates/server_add_dialog.ui")
class ServerAddDialog(Adw.Window):
__gtype_name__ = "MarmaladeServerAddDialog"
cancel_button = Gtk.Template.Child()
manual_add_button = Gtk.Template.Child()
manual_add_editable = Gtk.Template.Child()
detected_server_rows_group = Gtk.Template.Child()
detected_servers_spinner = Gtk.Template.Child()
detected_servers: ReactiveSet[Server]
@GObject.Signal(name="cancelled")
def cancelled(self):
"""Signal emitted when the dialog is cancelled"""
@GObject.Signal(name="server-picked", arg_types=[object])
def server_picked(self, _server: Server):
"""Signal emitted when a server is picked"""
# FIXME cannot emit with GObject.Object descendant in the args it seems ???
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.cancel_button.connect("clicked", self.on_cancel_button_clicked)
self.manual_add_button.connect("clicked", self.on_manual_button_clicked)
self.detected_servers = ReactiveSet()
self.detected_servers.emitter.connect(
"item-added", self.on_detected_server_added
)
# TODO run async
self.detect_servers_thread_func()
def detect_servers_thread_func(self) -> None:
for name, address_info_list in psutil.net_if_addrs().items():
for address_info in address_info_list:
if (
address_info.family not in (AF_INET, AF_INET6)
or address_info.broadcast is None
):
continue
# TODO Run async
self.detect_servers_on_interface(
name, address_info.family, address_info.broadcast
)
def detect_servers_on_interface(
self,
interface_name: str,
address_family: socket.AddressFamily,
broadcast_address: str,
) -> None:
MSG_ENCODING = "utf-8"
RECV_BUFFER_SIZE = 4096
JELLYFIN_DISCOVER_PORT = 7359
DISCOVER_SEND_TIMEOUT_SECONDS = 5
# TODO use 30 seconds when not testing
DISCOVER_RECV_TIMEOUT_SECONDS = 5
with socket.socket(
family=address_family, type=SOCK_DGRAM, proto=IPPROTO_UDP
) as sock:
logging.debug(
"Discovering servers on %s (%s) %s",
interface_name,
address_family,
broadcast_address,
)
# Prepare sockets
interface_name_buffer = bytearray(interface_name, encoding="utf-8")
sock.setsockopt(SOL_SOCKET, SO_BINDTODEVICE, interface_name_buffer)
sock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
sock.settimeout(DISCOVER_SEND_TIMEOUT_SECONDS)
# Broadcast discovery message
msg = bytearray("Who is JellyfinServer?", encoding=MSG_ENCODING)
try:
sock.sendto(msg, (broadcast_address, JELLYFIN_DISCOVER_PORT))
except TimeoutError:
logging.error("Server discovery broadcast send timed out")
return
# Listen for responses
start = time.time()
elapsed = 0
logging.debug("Listening for server responses")
while elapsed < DISCOVER_RECV_TIMEOUT_SECONDS:
sock.settimeout(DISCOVER_RECV_TIMEOUT_SECONDS - elapsed)
try:
(data, address_info) = sock.recvfrom(RECV_BUFFER_SIZE)
except TimeoutError:
logging.info("Server discovery receive timed out")
break
msg = data.decode(encoding=MSG_ENCODING)
# TODO handle response properly
logging.debug("Response from %s: %s", address_info, msg)
elapsed = time.time() - start
def on_detected_server_added(self, _emitter, server: Server) -> None:
row = ServerRow(server, "list-add-symbolic")
row.connect("button-clicked", self.on_detected_row_button_clicked)
self.detected_server_rows_group.add(row)
def on_cancel_button_clicked(self, _button) -> None:
self.emit("cancelled")
self.close()
def on_manual_button_clicked(self, _button: Gtk.Widget) -> None:
address = self.manual_add_editable.get_text()
# TODO test the address (notify failure visually)
# TODO get the server name
name = "TODO: Check server address + query its name"
server = Server(name, address)
self.emit("server-picked", server)
self.close()
def on_detected_row_button_clicked(self, server_row: ServerRow) -> None:
self.emit("server-picked", server_row.server)
self.close()