/
main
executable file
·289 lines (232 loc) · 11.1 KB
/
main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
#!/usr/bin/env python3
import asyncio
import argparse
import glob
import logging
import os
import random
import threading
import time
import shutil
from flask import Flask, render_template, request, redirect, url_for, jsonify
from elk_bridge import ElkBridge
from sound_downloader import download_sound
from helpers import get_all_sound_usage_logs, get_platform, get_command_output, ALLOWED_AUDIO_FILE_EXTENSIONS
from ui_state_manager.manager import state_manager, source_plugin_interface
from freesound_interface import get_access_token_from_code, get_currently_logged_in_user
from system_stats import collect_system_stats
# --- Config and globals ------------------------------------------------------------------------
disable_flask_logging = True
if disable_flask_logging:
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
elk_bridge_refresh_fps = 20
system_stats_refresh_fps = 1
current_frame_directory = 'static/frame/'
system_stats_thread = None
elk_bridge = None
elk_bridge_thread = None
http_server = Flask(__name__, template_folder='/udata/source/app/' if get_platform() == "ELK" else 'html/')
if get_platform() != "ELK":
# Copy main ui html file to template folder so it can also be served directly from there
plugin_ui_html_file_path = '../../SourceSampler/Resources/ui_plugin_ws.html'
shutil.copy(plugin_ui_html_file_path, 'html/ui_plugin_copy.html')
# --- Thread utils ------------------------------------------------------------------------
class ElkBridgeThread(threading.Thread):
def run(self):
global elk_bridge
asyncio.set_event_loop(asyncio.new_event_loop())
print('* Starting ElkBridge')
elk_bridge = ElkBridge(state_manager, source_plugin_interface, elk_bridge_refresh_fps)
elk_bridge.run()
class CollectSystemStatsThread(threading.Thread):
def run(self):
asyncio.set_event_loop(asyncio.new_event_loop())
print('* Starting system stats loop')
while True:
time.sleep(1.0 / system_stats_refresh_fps)
collect_system_stats()
class DownloadSoundsThread(threading.Thread):
download_url = None
download_to_path = None
sound_uuid = None
use_header = None
def __init__(self, _download_url, _download_to_path, _sound_uuid, _use_header):
super(DownloadSoundsThread, self).__init__()
self.download_url = _download_url
self.download_to_path = _download_to_path
self.sound_uuid = _sound_uuid
self.use_header = _use_header
def run(self):
asyncio.set_event_loop(asyncio.new_event_loop())
download_sound(
self.download_url, self.download_to_path, self.sound_uuid, self.use_header, source_plugin_interface)
# --- HTTP Server ------------------------------------------------------------------------
@http_server.route('/download_sound', methods=['GET']) # Download the sounds requested by the plugin
def download_sound_endpoint():
sound_uuid = request.args['soundUUID']
download_url = request.args['urlToDownloadFrom']
download_to_path = request.args['pathToSaveDownloadedFile']
use_header = request.args['downloadHeaders']
DownloadSoundsThread(download_url, download_to_path, sound_uuid, use_header).start()
return 'Downloading async...'
@http_server.route('/', methods=['GET', 'POST']) # Serve main interface HTML file
def index():
tvars = {'freesound_username': get_currently_logged_in_user()}
if request.method == 'GET':
return render_template("index.html", **tvars)
elif request.method == 'POST':
if request.form.get('presetIdx', False) != False:
# Sending preset files
try:
start_preset_idx = int(request.form.get('presetIdx'))
except ValueError:
return render_template("index.html", message_presets='Wrong preset number')
preset_files_directory = source_plugin_interface.get_preset_files_path()
if preset_files_directory is not None:
n_saved = 0
uploaded_files = request.files.getlist("file[]")
for f in uploaded_files:
if f.filename.split('.')[-1] in ['xml']: # Check that format is ok
f.save(os.path.join(preset_files_directory, '{}.xml'.format(start_preset_idx + n_saved)))
n_saved += 1
tvars.update(message_presets='{} presets transferred'.format(n_saved))
return render_template("index.html", **tvars)
else:
return render_template("index.html", message_presets='Errors transferring presets')
else:
# Sending sound files
save_to_local_files_directory = request.form.get('toLocal', False) != False
if save_to_local_files_directory:
files_directory = source_plugin_interface.get_local_audio_files_path()
else:
files_directory = source_plugin_interface.get_sound_audio_files_path()
if files_directory is not None:
n_saved = 0
uploaded_files = request.files.getlist("file[]")
for f in uploaded_files:
if f.filename.split('.')[-1] in ALLOWED_AUDIO_FILE_EXTENSIONS: # Check that format is ok
f.save(os.path.join(files_directory, f.filename))
n_saved += 1
tvars.update(message_sounds='{} sounds transferred'.format(n_saved))
return render_template("index.html", **tvars)
else:
return render_template("index.html", message_sounds='Errors transferring sounds')
@http_server.route('/update_html_ui_state', methods=['GET'])
def update_html_ui_state():
# Client requests to get an updated version of the state
state = {
'should_ask_for_data': False,
'should_load_url': False
}
if state_manager.is_waiting_for_data_from_web():
state['should_ask_for_data'] = True
state['web_form_id'] = state_manager.current_state.web_form_id
state['web_form_data'] = state_manager.current_state.data_for_web_form_id
if state_manager.open_url_in_browser is not None:
state['should_load_url'] = state_manager.open_url_in_browser
state_manager.open_url_in_browser = None
return jsonify(state)
@http_server.route('/usage_log', methods=['GET'])
def usage_log():
# Serve interface HTML file with all the available controls
tvars = {'log': get_all_sound_usage_logs()}
return render_template("sound_usage_log.html", **tvars)
@http_server.route('/freesound_login_callback', methods=['GET'])
def freesound_login_callback():
# Serve interface HTML file with all the available controls
code = request.args.get('code')
get_access_token_from_code(code)
return redirect(url_for('index'))
@http_server.route('/simulator', methods=['GET'])
def simulator():
# Serve the interface which simulates blackboard
tvars = {}
return render_template("simulator.html", **tvars)
@http_server.route('/simulator_get_display', methods=['GET'])
def simulator_get_display():
# Get contents of the display
global current_frame_directory
if not os.path.exists(current_frame_directory):
os.makedirs(current_frame_directory)
files = glob.glob(os.path.join(current_frame_directory, '*'))
for f in files:
os.remove(f)
path = os.path.join(current_frame_directory, 'frame{0}.jpg'.format(random.randint(0, 9999999)))
if elk_bridge is not None and elk_bridge.current_frame is not None:
elk_bridge.current_frame.save(path)
return path
else:
return ''
@http_server.route('/simulator_user_action', methods=['GET'])
def simulator_user_action():
# Simulate user pressing a button, knob, etc
global elk_bridge
action = request.args['action']
value = request.args['value']
if action.startswith("button"):
button_idx = int(action.split('_')[2])
if elk_bridge is not None:
elk_bridge.handle_buttons(button_idx, value)
elif action.startswith("slider"):
slider_idx = int(action.split('_')[1])
if elk_bridge is not None:
elk_bridge.handle_faders(slider_idx, float(value))
elif action.startswith("encoder"):
sub_action = action.split('_')[1]
if sub_action == 'rotate':
elk_bridge.handle_encoder(int(value))
elif sub_action == 'press':
elk_bridge.handle_encoder_button(1.0)
elif sub_action == 'release':
elk_bridge.handle_encoder_button(0.0)
return 'Action received'
@http_server.route('/get_source_state', methods=['GET'])
def get_source_state():
# Gets the internal XML state of the plugin and the processed JSON version, used by the simulator
if source_plugin_interface.sss.state_soup is not None:
data = {
'pluginState': source_plugin_interface.sss.state_soup.prettify(),
}
else:
data = {
'pluginState': 'No state',
}
return jsonify(data)
@http_server.route('/receive_data_from_web', methods=['GET'])
def receive_data_from_web():
# Endpoint used to recieve data form the web interface (for UI options that require web interface for example)
# "data" is a list of key/value pairs serialized as: key1:value1,key2:value2,key3:value3...
data = request.args.get('data', "")
data_dict = {element.split(':')[0]: element.split(':')[1] for element in data.split(',')}
state_manager.process_data_from_web(data_dict)
return 'Thanks!'
@http_server.route('/send_message_to_plugin', methods=['GET']) # Forwards the request contents as an OSC message to the plugin
def sensend_message_to_plugin():
address = request.args['address']
values = [getattr(__builtins__, type_name)(value) for type_name, value in
zip(request.args['types'].split(';'), request.args['values'].split(';'))]
source_plugin_interface.send_msg_to_plugin(address, values)
return 'Message sent'
@http_server.route('/ui_plugin', methods=['GET'])
def plugin_ui():
# Serve the main plugin ui as it is done in desktop computers
tvars = {}
return render_template("ui_plugin_copy.html", **tvars)
# --- Main ------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--http_port", type=int, default=8123, help="The port the web server should listen at")
args = parser.parse_args()
# If on ELK platform, start the thread that will do the communication with the hardware, draw on display, etc.
if elk_bridge_refresh_fps:
ElkBridgeThread().start()
# Also if on ELK platform, start the thread that will collect and update system stats periodically
if system_stats_refresh_fps:
collect_system_stats()
CollectSystemStatsThread().start()
print('* Running aconnect')
get_command_output("aconnect 16 128")
# Start HTTP server that will receive state updates from the plugin and implements the web interface
print('* Starting HTTP server at {}'.format(args.http_port))
http_server.run(host='0.0.0.0', port=args.http_port, debug=False)