Skip to content

Commit

Permalink
feat: python server (#6)
Browse files Browse the repository at this point in the history
- every execution needs a unique id to identify calculated values
- executes compiled safe-ds code in memory (using a finder / loader)
- pipelines are executed in child processes
- communication from child processes to the main process uses a shared
memory queue
- allows connections via websocket (and http)
- listens on a provided port (default 5000)
- uses flask and flask-sock for HTTP and WS
- uses gevent as WSGI implementation

---------

Co-authored-by: Jonas B <97200640+smitedeluxe@users.noreply.github.com>
Co-authored-by: megalinter-bot <129584137+megalinter-bot@users.noreply.github.com>
Co-authored-by: Lars Reimann <mail@larsreimann.com>
  • Loading branch information
4 people committed Nov 30, 2023
1 parent 1cb1ba1 commit a2c4f0f
Show file tree
Hide file tree
Showing 11 changed files with 1,806 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
exclude_lines =
pragma: no cover
if\s+(typing\.)?TYPE_CHECKING:

[run]
parallel = True
concurrency = multiprocessing, thread
415 changes: 414 additions & 1 deletion poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ packages = [
[tool.poetry.dependencies]
python = "^3.11,<3.13"
safe-ds = ">=0.14,<0.17"
flask = "^3.0.0"
flask-cors = "^4.0.0"
flask-sock = "^0.7.0"
gevent = "^23.9.1"

[tool.poetry.dev-dependencies]
pytest = "^7.4.3"
Expand Down
1 change: 1 addition & 0 deletions src/safeds_runner/server/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Infrastructure for dynamically running Safe-DS pipelines and communication with the VS Code extension."""
188 changes: 188 additions & 0 deletions src/safeds_runner/server/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Module containing the main entry point, for starting the Safe-DS runner."""

import argparse
import json
import logging

import flask.app
import flask_sock
import simple_websocket
from flask import Flask
from flask_cors import CORS
from flask_sock import Sock

from safeds_runner.server import messages
from safeds_runner.server.messages import (
Message,
create_placeholder_value,
message_type_placeholder_value,
parse_validate_message,
)
from safeds_runner.server.pipeline_manager import PipelineManager


def create_flask_app(testing: bool = False) -> flask.app.App:
"""
Create a flask app, that handles all requests.
Parameters
----------
testing : bool
Whether the app should run in a testing context.
Returns
-------
flask.app.App
Flask app.
"""
flask_app = Flask(__name__)
# Websocket Configuration
flask_app.config["SOCK_SERVER_OPTIONS"] = {"ping_interval": 25}
flask_app.config["TESTING"] = testing

# Allow access from VSCode extension
CORS(flask_app, resources={r"/*": {"origins": "vscode-webview://*"}})
return flask_app


def create_flask_websocket(flask_app: flask.app.App) -> flask_sock.Sock:
"""
Create a flask websocket extension.
Parameters
----------
flask_app: flask.app.App
Flask App Instance.
Returns
-------
flask_sock.Sock
Websocket extension for the provided flask app.
"""
return Sock(flask_app)


app = create_flask_app()
sock = create_flask_websocket(app)
app_pipeline_manager = PipelineManager()


@sock.route("/WSMain")
def _ws_main(ws: simple_websocket.Server) -> None:
ws_main(ws, app_pipeline_manager) # pragma: no cover


def ws_main(ws: simple_websocket.Server, pipeline_manager: PipelineManager) -> None:
"""
Handle websocket requests to the WSMain endpoint.
This function handles the bidirectional communication between the runner and the VS Code extension.
Parameters
----------
ws : simple_websocket.Server
Websocket Connection, provided by flask.
pipeline_manager : PipelineManager
Manager used to execute pipelines on, and retrieve placeholders from
"""
logging.debug("Request to WSRunProgram")
pipeline_manager.set_new_websocket_target(ws)
while True:
# This would be a JSON message
received_message: str = ws.receive()
if received_message is None:
logging.debug("Received EOF, closing connection")
ws.close()
return
logging.debug("Received Message: %s", received_message)
received_object, error_detail, error_short = parse_validate_message(received_message)
if received_object is None:
logging.error(error_detail)
ws.close(message=error_short)
return
match received_object.type:
case "program":
program_data, invalid_message = messages.validate_program_message_data(received_object.data)
if program_data is None:
logging.error("Invalid message data specified in: %s (%s)", received_message, invalid_message)
ws.close(None, invalid_message)
return
# This should only be called from the extension as it is a security risk
pipeline_manager.execute_pipeline(program_data, received_object.id)
case "placeholder_query":
placeholder_query_data, invalid_message = messages.validate_placeholder_query_message_data(
received_object.data,
)
if placeholder_query_data is None:
logging.error("Invalid message data specified in: %s (%s)", received_message, invalid_message)
ws.close(None, invalid_message)
return
placeholder_type, placeholder_value = pipeline_manager.get_placeholder(
received_object.id,
placeholder_query_data,
)
# send back a value message
if placeholder_type is not None:
send_websocket_message(
ws,
Message(
message_type_placeholder_value,
received_object.id,
create_placeholder_value(placeholder_query_data, placeholder_type, placeholder_value),
),
)
else:
# Send back empty type / value, to communicate that no placeholder exists (yet)
# Use name from query to allow linking a response to a request on the peer
send_websocket_message(
ws,
Message(
message_type_placeholder_value,
received_object.id,
create_placeholder_value(placeholder_query_data, "", ""),
),
)
case _:
if received_object.type not in messages.message_types:
logging.warning("Invalid message type: %s", received_object.type)


def send_websocket_message(connection: simple_websocket.Server, message: Message) -> None:
"""
Send any message to the VS Code extension.
Parameters
----------
connection : simple_websocket.Server
Websocket connection.
message : Message
Object that will be sent.
"""
connection.send(json.dumps(message.to_dict()))


def main() -> None: # pragma: no cover
"""
Execute the runner application.
Main entry point of the runner application.
"""
# Allow prints to be unbuffered by default
import builtins
import functools

builtins.print = functools.partial(print, flush=True) # type: ignore[assignment]

logging.getLogger().setLevel(logging.DEBUG)
from gevent.pywsgi import WSGIServer

parser = argparse.ArgumentParser(description="Start Safe-DS Runner on a specific port.")
parser.add_argument("--port", type=int, default=5000, help="Port on which to run the python server.")
args = parser.parse_args()
logging.info("Starting Safe-DS Runner on port %s", str(args.port))
# Only bind to host=127.0.0.1. Connections from other devices should not be accepted
WSGIServer(("127.0.0.1", args.port), app).serve_forever()


if __name__ == "__main__":
main() # pragma: no cover

0 comments on commit a2c4f0f

Please sign in to comment.