Skip to content

Commit

Permalink
Use additional labels, allowing multiple instances
Browse files Browse the repository at this point in the history
  • Loading branch information
remram44 committed Apr 21, 2023
1 parent 78c5028 commit 799b76e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
2 changes: 2 additions & 0 deletions k8s/helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ metadata:
{{- include "reproserver.labels" . | nindent 4 }}
data:
runner.namespace: {{ .Release.Namespace }}
runner.pod_labels: |
{{- include "reproserver.labels" . | nindent 4 }}
runner.pod_spec: |
restartPolicy: Never
{{- with .Values.dockerInDocker.imagePullSecrets }}
Expand Down
30 changes: 20 additions & 10 deletions reproserver/run/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
logger = logging.getLogger(__name__)


def labels_to_string(labels_dict):
return ','.join(k + '=' + v for k, v in labels_dict.items())


class InternalProxyHandler(ProxyHandler):
def select_destination(self):
# Authentication
Expand Down Expand Up @@ -60,6 +64,10 @@ def __init__(self, connector):
super(K8sRunner, self).__init__(connector)

self.config_dir = os.environ['K8S_CONFIG_DIR']
with open(os.path.join(self.config_dir, 'runner.namespace')) as fp:
self.namespace = fp.read().strip()
with open(os.path.join(self.config_dir, 'runner.pod_labels')) as fp:
self.pod_labels = yaml.safe_load(fp)

def _pod_name(self, run_id):
return 'run-{0}'.format(run_id)
Expand All @@ -79,8 +87,6 @@ async def run_inner(self, run_info):
# Load configuration from configmap volume
with open(os.path.join(self.config_dir, 'runner.pod_spec')) as fp:
pod_spec = yaml.safe_load(fp)
with open(os.path.join(self.config_dir, 'runner.namespace')) as fp:
namespace = fp.read().strip()

# Make required changes
for container in pod_spec['containers']:
Expand All @@ -99,15 +105,15 @@ async def run_inner(self, run_info):
kind='Pod',
metadata=k8s_client.V1ObjectMeta(
name=name,
labels={
labels=self.pod_labels | {
'app': 'run',
'run': str(run_id),
},
),
spec=pod_spec,
)
await v1.create_namespaced_pod(
namespace=namespace,
namespace=self.namespace,
body=pod,
)
logger.info("Pod created: %s", name)
Expand All @@ -118,13 +124,13 @@ async def run_inner(self, run_info):
kind='Service',
metadata=k8s_client.V1ObjectMeta(
name=name,
labels={
labels=self.pod_labels | {
'app': 'run',
'run': str(run_id),
},
),
spec=k8s_client.V1ServiceSpec(
selector={
selector=self.pod_labels | {
'app': 'run',
'run': str(run_id),
},
Expand All @@ -137,7 +143,7 @@ async def run_inner(self, run_info):
),
)
await v1.create_namespaced_service(
namespace=namespace,
namespace=self.namespace,
body=svc,
)
logger.info("Service created: %s", name)
Expand Down Expand Up @@ -211,6 +217,8 @@ def __init__(self, connector):
self.running = set()
with open(os.path.join(self.config_dir, 'runner.namespace')) as fp:
self.namespace = fp.read().strip()
with open(os.path.join(self.config_dir, 'runner.pod_labels')) as fp:
self.pod_labels = yaml.safe_load(fp)

def running_set_add(self, run_id):
if run_id in self.running:
Expand Down Expand Up @@ -246,7 +254,9 @@ async def watch(self):
watch = k8s_watch.Watch()
f, kwargs = v1.list_namespaced_pod, dict(
namespace=self.namespace,
label_selector='app=run',
label_selector=labels_to_string(
self.pod_labels | {'app': 'run'}
),
)
while True:
try:
Expand All @@ -264,7 +274,7 @@ async def _full_sync(self, api):
# Find existing run pods
pods = await v1.list_namespaced_pod(
namespace=self.namespace,
label_selector='app=run',
label_selector=labels_to_string(self.pod_labels | {'app': 'run'}),
)
for pod in pods.items:
run_id = int(pod.metadata.labels['run'], 10)
Expand All @@ -276,7 +286,7 @@ async def _full_sync(self, api):
# Find existing services
services = await v1.list_namespaced_service(
namespace=self.namespace,
label_selector='app=run',
label_selector=labels_to_string(self.pod_labels | {'app': 'run'}),
)
for service in services.items:
run_id = int(service.metadata.labels['run'], 10)
Expand Down

0 comments on commit 799b76e

Please sign in to comment.