Skip to content

Commit

Permalink
Merge pull request #940 from UCL-INGI/fix_887
Browse files Browse the repository at this point in the history
[agent/docker] replace SSH opt-in by env types
  • Loading branch information
Drumor committed Apr 20, 2023
2 parents 145fc84 + 6fcd7c5 commit b3f66e3
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 59 deletions.
5 changes: 2 additions & 3 deletions inginious/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Agent(object, metaclass=ABCMeta):
An INGInious agent, that grades specific kinds of jobs, and interacts with a Backend.
"""

def __init__(self, context, backend_addr, friendly_name, concurrency, filesystem, ssh_allowed=False):
def __init__(self, context, backend_addr, friendly_name, concurrency, filesystem):
"""
:param context: a ZMQ context to which the agent will be linked
:param backend_addr: address of the backend to which the agent should connect. The format is the same as ZMQ
Expand All @@ -66,7 +66,6 @@ def __init__(self, context, backend_addr, friendly_name, concurrency, filesystem
self.__backend_addr = backend_addr
self.__context = context
self.__friendly_name = friendly_name
self.__ssh_allowed = ssh_allowed
self.__backend_socket = self.__context.socket(zmq.DEALER)
self.__backend_socket.ipv6 = True

Expand Down Expand Up @@ -112,7 +111,7 @@ async def run(self):

# Tell the backend we are up and have `concurrency` threads available
self._logger.info("Saying hello to the backend")
await ZMQUtils.send(self.__backend_socket, AgentHello(self.__friendly_name, self.__concurrency, self.environments, self.__ssh_allowed))
await ZMQUtils.send(self.__backend_socket, AgentHello(self.__friendly_name, self.__concurrency, self.environments))
self.__backend_last_seen_time = time.time()

run_listen = self._loop.create_task(self.__run_listen())
Expand Down
42 changes: 23 additions & 19 deletions inginious/agent/docker_agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class DockerRunningJob:
assigned_external_ports: List[int]
student_containers: Set[str] # container ids of student containers
enable_network: bool
ssh_allowed: bool


@dataclass
Expand Down Expand Up @@ -80,8 +79,7 @@ def __init__(self, context, backend_addr, friendly_name, concurrency, tasks_fs:
:param runtime: runtime used by docker (the defaults are "runc" with docker or "kata-runtime" with kata)
:param ssh_allowed: boolean to make this agent accept tasks with ssh or not
"""
super(DockerAgent, self).__init__(context, backend_addr, friendly_name, concurrency, tasks_fs,
ssh_allowed=ssh_allowed)
super(DockerAgent, self).__init__(context, backend_addr, friendly_name, concurrency, tasks_fs)

self._runtimes = {x.envtype: x for x in runtimes} if runtimes is not None else None

Expand All @@ -102,6 +100,9 @@ def __init__(self, context, backend_addr, friendly_name, concurrency, tasks_fs:
self._aos = AsyncProxy(os)
self._ashutil = AsyncProxy(shutil)

# Does this agent allow ssh_student ?
self._ssh_allowed = ssh_allowed

async def _init_clean(self):
""" Must be called when the agent is starting """
# Data about running containers
Expand Down Expand Up @@ -281,7 +282,6 @@ def __new_job_sync(self, message: BackendNewJob, future_results):

try:
enable_network = message.environment_parameters.get("network_grading", False)
ssh_allowed = message.environment_parameters.get("ssh_allowed", False)
limits = message.environment_parameters.get("limits", {})
time_limit = int(limits.get("time", 30))
hard_time_limit = int(limits.get("hard_time", None) or time_limit * 3)
Expand Down Expand Up @@ -413,8 +413,7 @@ def __new_job_sync(self, message: BackendNewJob, future_results):
run_cmd=run_cmd,
assigned_external_ports=list(ports.values()),
student_containers=set(),
enable_network=enable_network,
ssh_allowed=ssh_allowed
enable_network=enable_network
)

self._containers_running[container_id] = info
Expand Down Expand Up @@ -675,7 +674,7 @@ async def handle_running_container(self, info: DockerRunningJob, future_results)
ssh = msg["ssh"]
run_as_root = msg["run_as_root"]
assert "/" not in socket_id # ensure task creator do not try to break the agent :-(
if ssh and not (info.enable_network and info.ssh_allowed):
if ssh and not (info.enable_network and "ssh" in info.environment_type and self._ssh_allowed):
self._logger.error(
"Exception: ssh for student requires to allow ssh and internet access in the task %s environment configuration tab",
info.job_id)
Expand Down Expand Up @@ -980,22 +979,27 @@ async def run(self):

def _detect_runtimes(self) -> Dict[str, DockerRuntime]:
heuristic = [
("runc", lambda x: DockerRuntime(runtime=x, run_as_root=False, enables_gpu=False, shared_kernel=True, envtype="docker")),
("crun", lambda x: DockerRuntime(runtime=x, run_as_root=False, enables_gpu=False, shared_kernel=True, envtype="docker")),
("kata", lambda x: DockerRuntime(runtime=x, run_as_root=True, enables_gpu=False, shared_kernel=False, envtype="kata")),
("nvidia", lambda x: DockerRuntime(runtime=x, run_as_root=False, enables_gpu=True, shared_kernel=True, envtype="nvidia"))
("runc", lambda x, y: DockerRuntime(runtime=x, run_as_root=False, enables_gpu=False,
shared_kernel=True, envtype="docker-ssh" if y else "docker")),
("crun", lambda x, y: DockerRuntime(runtime=x, run_as_root=False, enables_gpu=False,
shared_kernel=True, envtype="docker-ssh" if y else "docker")),
("kata", lambda x, y: DockerRuntime(runtime=x, run_as_root=True, enables_gpu=False,
shared_kernel=False, envtype="kata-ssh" if y else "kata")),
("nvidia", lambda x, y: DockerRuntime(runtime=x, run_as_root=False, enables_gpu=True,
shared_kernel=True, envtype="nvidia-ssh" if y else "nvidia"))
]
retval = {}

for runtime in self._docker.sync.list_runtimes().keys():
for h_runtime, f in heuristic:
if h_runtime in runtime:
v = f(runtime)
if v.envtype not in retval:
self._logger.info("Using %s as runtime with parameters %s", runtime, str(v))
retval[v.envtype] = v
else:
self._logger.warning(
"%s was detected as a runtime; it would duplicate another one, so we ignore it. %s",
runtime, str(v))
for ssh_allowed in {self._ssh_allowed, False}:
v = f(runtime, ssh_allowed)
if v.envtype not in retval:
self._logger.info("Using %s as runtime with parameters %s", runtime, str(v))
retval[v.envtype] = v
else:
self._logger.warning(
"%s was detected as a runtime; it would duplicate another one, so we ignore it. %s",
runtime, str(v))
return retval
20 changes: 5 additions & 15 deletions inginious/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

RunningJob = namedtuple('RunningJob', ['agent_addr', 'client_addr', 'msg', 'time_started'])
EnvironmentInfo = namedtuple('EnvironmentInfo', ['last_id', 'created_last', 'agents', 'type'])
AgentInfo = namedtuple('AgentInfo', ['name', 'environments', 'ssh_allowed']) # environments is a list of tuple (type, environment)
AgentInfo = namedtuple('AgentInfo', ['name', 'environments']) # environments is a list of tuple (type, environment)

class Backend(object):
"""
Expand Down Expand Up @@ -138,7 +138,7 @@ async def handle_client_new_job(self, client_addr, message: ClientNewJob):
self._logger.info("Adding a new job %s %s to the queue", client_addr, message.job_id)
job = WaitingJob(message.priority, time.time(), client_addr, message.job_id, message)
self._waiting_jobs[message.job_id] = job
self._waiting_jobs_pq.put((message.environment_type, message.environment, self._get_ssh_allowed(message)), job)
self._waiting_jobs_pq.put((message.environment_type, message.environment), job)

await self.update_queue()

Expand Down Expand Up @@ -196,10 +196,7 @@ async def update_queue(self):
job = None
while job is None:
# keep the object, do not unzip it directly! It's sometimes modified when a job is killed.

topics = [(*env, False) for env in self._registered_agents[agent_addr].environments]
if self._registered_agents[agent_addr].ssh_allowed:
topics += [(*env, True) for env in self._registered_agents[agent_addr].environments]
topics = self._registered_agents[agent_addr].environments

job = self._waiting_jobs_pq.get(topics)
priority, insert_time, client_addr, job_id, job_msg = job
Expand Down Expand Up @@ -238,7 +235,7 @@ async def handle_agent_hello(self, agent_addr, message: AgentHello):

self._registered_agents[agent_addr] = AgentInfo(message.friendly_name,
[(etype, env) for etype, envs in
message.available_environments.items() for env in envs], message.ssh_allowed)
message.available_environments.items() for env in envs])
self._available_agents.extend([agent_addr for _ in range(0, message.available_job_slots)])
self._ping_count[agent_addr] = 0

Expand Down Expand Up @@ -407,11 +404,4 @@ def _get_time_limit_estimate(self, job_info: ClientNewJob):
try:
return int(job_info.environment_parameters["limits"]["time"])
except:
return -1 # unknown

def _get_ssh_allowed(self, job_info: ClientNewJob):
"""
Returns if the job requires that the agent allows ssh
For this to work, ["ssh_allowed"] must be a parameter of the environment.
"""
return job_info.environment_parameters.get("ssh_allowed", False)
return -1 # unknown
1 change: 0 additions & 1 deletion inginious/common/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ class AgentHello:
friendly_name: str # a string containing a friendly name to identify agent
available_job_slots: int # an integer giving the number of concurrent
available_environments: Dict[str, Dict[str, Dict[str, Any]]] # dict of available environments:
ssh_allowed: bool
# {
# "type": {
# "name": { # for example, "default"
Expand Down
3 changes: 3 additions & 0 deletions inginious/frontend/environment_types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def register_env_type(env_obj):
def register_base_env_types():
# register standard env types here
register_env_type(DockerEnvType())
register_env_type(DockerEnvType(ssh_allowed=True))
register_env_type(NvidiaEnvType())
register_env_type(NvidiaEnvType(ssh_allowed=True))
register_env_type(KataEnvType())
register_env_type(KataEnvType(ssh_allowed=True))
register_env_type(MCQEnvType())
4 changes: 2 additions & 2 deletions inginious/frontend/environment_types/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
class DockerEnvType(GenericDockerOCIRuntime):
@property
def id(self):
return "docker"
return "docker-ssh" if self._ssh_allowed else "docker"

@property
def name(self):
return _("Standard container (Docker)")
return _("Standard container + SSH") if self._ssh_allowed else _("Standard container")
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ def check_task_environment_parameters(self, data):
# Network access in grading container?
out["network_grading"] = data.get("network_grading", False)

# SSH allowed ?
out["ssh_allowed"] = data.get("ssh_allowed", False)
if out["ssh_allowed"] == 'on':
out["ssh_allowed"] = True

# Limits
limits = {"time": 20, "memory": 1024, "disk": 1024}
if "limits" in data:
Expand All @@ -59,3 +54,6 @@ def check_task_environment_parameters(self, data):
def studio_env_template(self, templator, task, allow_html: bool):
return templator.render("course_admin/edit_tabs/env_generic_docker_oci.html", env_params=task.get("environment_parameters", {}),
content_is_html=allow_html, env_id=self.id)

def __init__(self, ssh_allowed=False):
self._ssh_allowed = ssh_allowed
4 changes: 2 additions & 2 deletions inginious/frontend/environment_types/kata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
class KataEnvType(GenericDockerOCIRuntime):
@property
def id(self):
return "kata"
return "kata-ssh" if self._ssh_allowed else "kata"

@property
def name(self):
return _("Container running as root (Kata)")
return _("Container running as root (Kata) + SSH") if self._ssh_allowed else _("Container running as root (Kata)")
4 changes: 2 additions & 2 deletions inginious/frontend/environment_types/nvidia.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
class NvidiaEnvType(GenericDockerOCIRuntime):
@property
def id(self):
return "nvidia"
return "nvidia-ssh" if self._ssh_allowed else "nvidia"

@property
def name(self):
return _("Container with GPUs (NVIDIA)")
return _("Container with GPUs (NVIDIA) + SSH") if self._ssh_allowed else _("Container with GPUs (NVIDIA)")
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@
</label></div>
</div>
</div>
<div class="form-group row">
<label for="{{env_id}}-ssh-allowed" class="col-sm-4 control-label">{{ _("Allow ssh?") }}</label>

<div class="col-sm-8">
<div class="checkbox"><label>
<input type="checkbox" id="{{env_id}}-ssh-allowed" name="envparams[{{env_id}}][ssh_allowed]"
{{'checked="checked"' if env_params.get('ssh_allowed',False) }} />&nbsp;
</label></div>
</div>
</div>

<div class="form-group row">
<label for="{{env_id}}-run-cmd" class="col-sm-4 control-label">{{ _("Custom command to be run in container <small>(instead of running the run script)</small>") | safe}}</label>
Expand Down

0 comments on commit b3f66e3

Please sign in to comment.