Skip to content

Commit

Permalink
initial full version
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgarcia committed Feb 25, 2019
1 parent fde9f36 commit ce5db37
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .gitignore
@@ -1,3 +1,3 @@
__pycache__/*
__pycache__
venv
.vscode
14 changes: 14 additions & 0 deletions Dockerfile
@@ -0,0 +1,14 @@
FROM python:3-alpine3.9 as base

FROM base as builder
RUN mkdir /install
WORKDIR /install
COPY requirements.txt /requirements.txt
RUN apk add --no-cache --virtual .build-deps gcc musl-dev libffi-dev openssl-dev
RUN pip install --install-option="--prefix=/install" -r /requirements.txt

FROM base
COPY --from=builder /install /usr/local
COPY src /exampleoperatorpy
WORKDIR /exampleoperatorpy
CMD ["python", "main.py"]
22 changes: 22 additions & 0 deletions Makefile
@@ -0,0 +1,22 @@
IMG?=exampleoperatorpy:dev

dep:
pip install -r requirements.txt

docker-build:
docker build . -t ${IMG}

# Install CRDs and RBACs into a cluster
install:
kubectl apply -f config/crds
kubectl apply -f config/rbac

# Deploy controller in the configured Kubernetes cluster in ~/.kube/config
deploy: install
kubectl apply -f config/default --namespace=system

# Remove controller in the configured Kubernetes cluster in ~/.kube/config
undeploy:
kubectl delete -f config/default --namespace=system
kubectl delete -f config/crds || true
kubectl delete -f config/rbac || true
82 changes: 82 additions & 0 deletions README.md
@@ -0,0 +1,82 @@
# exampleoperatorpy

This repository implements an example Kubernetes operator in Python 3, called "ImmortalContainers". This operator enables the user to define, using custom resources, containers that must run and if terminated must be restarted.

## Venv and project dependencies

To create a virtual env and install the project dependencies follow these steps:

```bash
python3 -m venv venv
. ./venv/bin/activate
make dep
```

## Install CRD and RBAC permissions

To install CRDs and RBAC configurations to your currently set cluster use:

```bash
make install
```

## Running the operator outside the cluster

```bash
. ./venv/bin/activate
python src/main.py --kubeconfig ~/.kube/config
```

## Running inside the cluster

You must first generate the image using `make docker-build` and push it to your repo.

If using **minikube** follow these steps:

```bash
eval $(minikube docker-env)
make docker-build
```

Then create the `system` namespace

```bash
kubectl apply -f config/namespace.yaml
```

And then run `make deploy`.

After this you should check that everything is running, ex:

```bash
$ kubectl get pods --namespace system
NAME READY STATUS RESTARTS AGE
exampleoperatorpy-controller-7cb7f99658-97zjs 1/1 Running 0 24m

$ kubectl logs exampleoperatorpy-controller-7cb7f99658-97zjs --namespace=system

INFO:controller:Controller starting
```

## Using the operator

Once the operator is running you can create immortal containers using a custom resource like this one:

```yaml
apiVersion: exampleoperator.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
name: example-immortal-container
spec:
image: nginx:latest
```

Run `kubectl apply -f config/example-use.yaml` to try it.

Then run `kubectl get pods` and check the pod is created. If you kill the pod it will be recreated.

## Remove the operator

To remove the operator, CDR and RBAC use `make undeploy`

Pods created by the operator will not be deleted, but will not be restarted if deleted later.
12 changes: 6 additions & 6 deletions config/default/default.yaml
@@ -1,19 +1,19 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: exampleoperator-controller
name: exampleoperatorpy-controller
labels:
app: exampleoperator-controller
app: exampleoperatorpy-controller
spec:
replicas: 1
selector:
matchLabels:
app: exampleoperator-controller
app: exampleoperatorpy-controller
template:
metadata:
labels:
app: exampleoperator-controller
app: exampleoperatorpy-controller
spec:
containers:
- image: exampleoperator:dev
name: exampleoperator-controller
- image: exampleoperatorpy:dev
name: exampleoperatorpy-controller
179 changes: 179 additions & 0 deletions src/controller.py
@@ -0,0 +1,179 @@
import logging
import queue
import threading
from pprint import pprint

from kubernetes.client.rest import ApiException
from kubernetes.client import models
import copy

logger = logging.getLogger('controller')

class Controller(threading.Thread):
"""Reconcile current and desired state by listening for events and making
calls to Kubernetes API.
"""
def __init__(self, pods_watcher, immortalcontainers_watcher, corev1api,
customsapi, custom_group, custom_version, custom_plural,
custom_kind, workqueue_size=10):
"""Initializes the controller.
:param pods_watcher: Watcher for pods events.
:param immortalcontainers_watcher: Watcher for immortalcontainers custom
resource events.
:param corev1api: kubernetes.client.CoreV1Api()
:param customsapi: kubernetes.client.CustomObjectsApi()
:param custom_group: The custom resource's group name
:param custom_version: The custom resource's version
:param custom_plural: The custom resource's plural name.
:param custom_kind: The custom resource's kind name.
:param workqueue_size: queue size for resources that must be processed.
"""
super().__init__()
# `workqueue` contains namespace/name of immortalcontainers whose status
# must be reconciled
self.workqueue = queue.Queue(workqueue_size)
self.pods_watcher = pods_watcher
self.immortalcontainers_watcher = immortalcontainers_watcher
self.corev1api = corev1api
self.customsapi = customsapi
self.custom_group = custom_group
self.custom_version = custom_version
self.custom_plural = custom_plural
self.custom_kind = custom_kind
self.pods_watcher.add_handler(self._handle_pod_event)
self.immortalcontainers_watcher.add_handler(self._handle_immortalcontainer_event)

def _handle_pod_event(self, event):
"""Handle an event from the pods watcher putting the pod's corresponding
immortalcontroller in the `workqueue`. """
obj = event['object']
owner_name = ""
if obj.metadata.owner_references is not None:
for owner_ref in obj.metadata.owner_references:
if owner_ref.api_version == self.custom_group+"/"+self.custom_version and \
owner_ref.kind == self.custom_kind:
owner_name = owner_ref.name
if owner_name != "":
self._queue_work(obj.metadata.namespace+"/"+owner_name)

def _handle_immortalcontainer_event(self, event):
"""Handle an event from the immortalcontainers watcher putting the
resource name in the `workqueue`."""
self._queue_work(event['object']['metadata']['namespace']+
"/"+event['object']['metadata']['name'])

def _queue_work(self, resource_key):
"""Add a resource name to the work queue."""
if len(resource_key.split("/")) != 2:
logger.error("Invalid resource key: {:s}".format(resource_key))
return
self.workqueue.put(resource_key)

def run(self):
"""Dequeue and process resources from the `workqueue`. This method
should not be called directly, but using `start()"""
self.running = True
logger.info('Controller starting')
while self.running:
e = self.workqueue.get()
if not self.running:
self.workqueue.task_done()
break
try:
self._reconcile_state(e)
self.workqueue.task_done()
except Exception as ex:
logger.error("Error _reconcile state {:s} {:s}".format(e, str(ex)))

def stop(self):
"""Stops this controller thread"""
self.running = False
self.workqueue.put(None)

def _reconcile_state(self, resource_key):
"""Make changes to go from current state to desired state and update
resource status."""
logger.info("Reconcile state: {:s}".format(resource_key))
ns, name = resource_key.split("/")

# Get resource if it exists
try:
immortalcontainer = self.customsapi.get_namespaced_custom_object(
self.custom_group, self.custom_version, ns, self.custom_plural, name)
except ApiException as e:
if e.status == 404:
logger.info("Element {:s} in workqueue no longel exist".format(resource_key))
return
raise e

# Get resource status
status = self._get_status(immortalcontainer)

# Get associated pod
pod = None
if status['currentPod'] != "":
try:
pod = self.corev1api.read_namespaced_pod(status['currentPod'], ns)
except ApiException as e:
if e.status != 404:
logger.info("Error retrieving pod {:s} for immortalcontainer {:s}".format(status['currentPod'], resource_key))
raise e

if pod is None:
# If no pod exists create one
pod_request = self._new_pod(immortalcontainer)
pod = self.corev1api.create_namespaced_pod(ns, pod_request)

# update status
self._update_status(immortalcontainer, pod)

def _update_status(self, immortalcontainer, pod):
"""Updates an ImmortalContainer status"""
new_status = self._calculate_status(immortalcontainer, pod)
self.customsapi.patch_namespaced_custom_object(
self.custom_group, self.custom_version,
immortalcontainer['metadata']['namespace'],
self.custom_plural, immortalcontainer['metadata']['name'],
new_status
)

def _calculate_status(self, immortalcontainer, pod):
"""Calculates what the status of an ImmortalContainer should be """
new_status = copy.deepcopy(immortalcontainer)
new_status['status'] = dict(currentPod=pod.metadata.name)
return new_status


def _get_status(self, immortalcontainer):
"""Get the status from an ImmortalContainer. If `immortalcontainer`
has no status, returns a default status."""
if 'status' in immortalcontainer:
return immortalcontainer['status']
else:
return dict(currentPod='')

def _new_pod(self, immortalcontainer):
"""Returns the pod definition to create the pod for an ImmortalContainer"""
labels = dict(controller=immortalcontainer['metadata']['name'])
return models.V1Pod(
metadata=models.V1ObjectMeta(
generate_name="immortalpod-",
labels=labels,
namespace=immortalcontainer['metadata']['namespace'],
owner_references=[models.V1OwnerReference(
api_version=self.custom_group+"/"+self.custom_version,
controller=True,
kind=self.custom_kind,
name=immortalcontainer['metadata']['name'],
uid=immortalcontainer['metadata']['uid']
)]),
spec=models.V1PodSpec(
containers=[
models.V1Container(
name="acontainer",
image=immortalcontainer['spec']['image']
)
]
)
)
4 changes: 4 additions & 0 deletions src/defs.py
@@ -0,0 +1,4 @@
CUSTOM_GROUP = 'exampleoperator.flugel.it'
CUSTOM_VERSION = 'v1alpha1'
CUSTOM_PLURAL = 'immortalcontainers'
CUSTOM_KIND = 'ImmortalContainer'
48 changes: 48 additions & 0 deletions src/main.py
@@ -0,0 +1,48 @@
import argparse
import logging
import sys

from kubernetes import client, config

import defs
from controller import Controller
from threadedwatch import ThreadedWatchStream

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def main():
parser = argparse.ArgumentParser()
parser.add_argument('--kubeconfig', help='path tu kubeconfig file, only required if running outside of a cluster')
args = parser.parse_args()
if args.kubeconfig is not None:
config.load_kube_config()
else:
config.load_incluster_config()

corev1api = client.CoreV1Api()
customsapi = client.CustomObjectsApi()

# Changing this it's possible to work on all the namespaces or choose only one
pods_watcher = ThreadedWatchStream(corev1api.list_pod_for_all_namespaces)
immortalcontainers_watcher = ThreadedWatchStream(
customsapi.list_cluster_custom_object, defs.CUSTOM_GROUP,
defs.CUSTOM_VERSION, defs.CUSTOM_PLURAL
)
controller = Controller(pods_watcher, immortalcontainers_watcher, corev1api,
customsapi, defs.CUSTOM_GROUP, defs.CUSTOM_VERSION,
defs.CUSTOM_PLURAL, defs.CUSTOM_KIND)

controller.start()
pods_watcher.start()
immortalcontainers_watcher.start()
try:
controller.join()
except (KeyboardInterrupt, SystemExit):
print('\n! Received keyboard interrupt, quitting threads.\n')
controller.stop()
controller.join()

if __name__ == '__main__':
main()

0 comments on commit ce5db37

Please sign in to comment.