Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run job and job run status #8

Merged
merged 1 commit into from Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/kube-manifest.yaml
Expand Up @@ -15,10 +15,12 @@ rules:
- "batch"
resources:
- cronjobs
- jobs
verbs:
- get
- list
- delete
- create
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
4 changes: 2 additions & 2 deletions src/api/api.go
Expand Up @@ -48,6 +48,6 @@ func (a *Api) Start() {
}

// Output HTTP 500 with JSON body containing error message
func JsonError(c *gin.Context, message string) {
c.JSON(500, gin.H{"message": message})
func JsonError(c *gin.Context, statusCode int, message string) {
c.JSON(statusCode, gin.H{"message": message})
}
18 changes: 9 additions & 9 deletions src/api/jobs.go
Expand Up @@ -11,17 +11,17 @@ func handleGetJobs(c *gin.Context) {
ret := []*helpers.MetronomeJob{}
namespaces, err := kube.GetNamespaces()
if err != nil {
JsonError(c, fmt.Sprintf("Failed to list namespaces: %s", err))
JsonError(c, 500, fmt.Sprintf("Failed to list namespaces: %s", err))
return
}
for _, namespace := range namespaces {
jobs, err := kube.GetCronJobs(namespace)
if err != nil {
JsonError(c, fmt.Sprintf("Failed to list jobs: %s", err))
JsonError(c, 500, fmt.Sprintf("Failed to list jobs: %s", err))
return
}
for _, job := range jobs {
tmp_job := helpers.JobKubernetesToMetronome(&job)
tmp_job := helpers.CronJobKubernetesToMetronome(&job)
ret = append(ret, tmp_job)
}
}
Expand All @@ -36,15 +36,15 @@ func handleGetJob(c *gin.Context) {
jobId := c.Param("jobid")
namespace, name, err := helpers.SplitMetronomeJobId(jobId)
if err != nil {
JsonError(c, err.Error())
JsonError(c, 500, err.Error())
return
}
job, err := kube.GetCronJob(namespace, name)
if err != nil {
JsonError(c, fmt.Sprintf("cannot retrieve job: %s", err))
JsonError(c, 404, fmt.Sprintf("cannot retrieve job: %s", err))
return
}
tmp_job := helpers.JobKubernetesToMetronome(job)
tmp_job := helpers.CronJobKubernetesToMetronome(job)
c.JSON(200, tmp_job)
}

Expand All @@ -56,7 +56,7 @@ func handleDeleteJob(c *gin.Context) {
jobId := c.Param("jobid")
namespace, name, err := helpers.SplitMetronomeJobId(jobId)
if err != nil {
JsonError(c, err.Error())
JsonError(c, 500, err.Error())
return
}

Expand All @@ -69,10 +69,10 @@ func handleDeleteJob(c *gin.Context) {
c.JSON(404, msg)
return
} else if err != nil {
JsonError(c, fmt.Sprintf("failed to delete job: %s", err))
JsonError(c, 500, fmt.Sprintf("failed to delete job: %s", err))
return
}

tmp_job := helpers.JobKubernetesToMetronome(job)
tmp_job := helpers.CronJobKubernetesToMetronome(job)
c.JSON(200, tmp_job)
}
36 changes: 34 additions & 2 deletions src/api/runs.go
@@ -1,6 +1,9 @@
package api

import (
"fmt"
"github.com/applauseoss/metronomikon/helpers"
"github.com/applauseoss/metronomikon/kube"
"github.com/gin-gonic/gin"
)

Expand All @@ -9,11 +12,40 @@ func handleGetJobRuns(c *gin.Context) {
}

func handleTriggerJobRun(c *gin.Context) {
c.String(200, "TODO")
jobId := c.Param("jobid")
namespace, name, err := helpers.SplitMetronomeJobId(jobId)
if err != nil {
JsonError(c, 500, err.Error())
return
}
cronJob, err := kube.GetCronJob(namespace, name)
if err != nil {
JsonError(c, 404, fmt.Sprintf("cannot retrieve job: %s", err))
return
}
job, err := kube.CreateJobFromCronjob(cronJob)
if err != nil {
JsonError(c, 500, fmt.Sprintf("cannot run job: %s", err))
return
}
c.JSON(201, helpers.JobKubernetesToMetronome(job))
}

func handleGetJobRun(c *gin.Context) {
c.String(200, "TODO")
// This API endpoint also takes a 'jobid' parameter, but we don't need it
// because each job run has a unique name already in kubernetes
runId := c.Param("runid")
namespace, name, err := helpers.SplitMetronomeJobId(runId)
if err != nil {
JsonError(c, 500, err.Error())
return
}
job, err := kube.GetJob(namespace, name)
if err != nil {
JsonError(c, 404, fmt.Sprintf("cannot get job: %s", err))
return
}
c.JSON(200, helpers.JobKubernetesToMetronome(job))
}

func handleStopJobRun(c *gin.Context) {
Expand Down
40 changes: 36 additions & 4 deletions src/helpers/helpers.go
Expand Up @@ -3,7 +3,8 @@ package helpers
import (
"fmt"
"github.com/applauseoss/metronomikon/config"
v1beta1 "k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"strings"
)

Expand Down Expand Up @@ -49,14 +50,23 @@ type MetronomeJob struct {
} `json:"run"`
}

type MetronomeJobRun struct {
CompletedAt *string `json:"completedAt"` // we use a pointer so that we can get a null in the JSON if not populated
CreatedAt string `json:"createdAt"`
Id string `json:"id"`
JobId string `json:"jobId"`
Status string `json:"status"`
Tasks []string `json:"tasks"`
}

// Convert Kubernetes CronJob to Metronome format
func JobKubernetesToMetronome(job *v1beta1.CronJob) *MetronomeJob {
func CronJobKubernetesToMetronome(cronJob *batchv1beta1.CronJob) *MetronomeJob {
ret := &MetronomeJob{}
cfg := config.GetConfig()
ret.Id = fmt.Sprintf("%s.%s", job.ObjectMeta.Namespace, job.ObjectMeta.Name)
ret.Id = fmt.Sprintf("%s.%s", cronJob.ObjectMeta.Namespace, cronJob.ObjectMeta.Name)
// Metronome only supports a single container, so we grab the first one
// XXX: make this configurable?
container := job.Spec.JobTemplate.Spec.Template.Spec.Containers[0]
container := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0]
ret.Run.Docker.Image = container.Image
ret.Run.Args = container.Args
if len(container.Command) > 0 {
Expand All @@ -69,6 +79,28 @@ func JobKubernetesToMetronome(job *v1beta1.CronJob) *MetronomeJob {
return ret
}

// Convert Kubernetes Job to Metronome job run format
func JobKubernetesToMetronome(job *batchv1.Job) *MetronomeJobRun {
ret := &MetronomeJobRun{}
ret.Id = fmt.Sprintf("%s.%s", job.ObjectMeta.Namespace, job.ObjectMeta.Name)
ret.JobId = fmt.Sprintf("%s.%s", job.ObjectMeta.Namespace, job.ObjectMeta.OwnerReferences[0].Name)
ret.CreatedAt = job.ObjectMeta.CreationTimestamp.String()
if job.Status.StartTime == nil {
ret.Status = "STARTING"
} else if job.Status.CompletionTime == nil {
ret.Status = "RUNNING"
} else if job.Status.Failed > 0 {
ret.Status = "FAILED"
} else {
agaffney marked this conversation as resolved.
Show resolved Hide resolved
ret.Status = "COMPLETED"
// We need a temp var to be able to use the address of it in the assignment below
completionTime := job.Status.CompletionTime.String()
ret.CompletedAt = &completionTime
}
ret.Tasks = make([]string, 0)
return ret
}

// Split Metronome job ID into Kubernetes namespace and job name
func SplitMetronomeJobId(jobId string) (string, string, error) {
parts := strings.SplitN(jobId, ".", 2)
Expand Down
49 changes: 45 additions & 4 deletions src/kube/jobs.go
Expand Up @@ -2,15 +2,18 @@ package kube

import (
"fmt"
v1beta1 "k8s.io/api/batch/v1beta1"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
)

// Delete specific CronJob
// Returns nil and error when job doesn't exist
// Return job and error when deleting job failed
// Return job and nil when deletion succeeded
func DeleteCronJob(namespace string, name string) (*v1beta1.CronJob, error) {
func DeleteCronJob(namespace string, name string) (*batchv1beta1.CronJob, error) {
job, err := GetCronJob(namespace, name)
if err != nil {
return nil, err
Expand All @@ -23,7 +26,7 @@ func DeleteCronJob(namespace string, name string) (*v1beta1.CronJob, error) {
}

// Return all CronJobs in a namespace
func GetCronJobs(namespace string) ([]v1beta1.CronJob, error) {
func GetCronJobs(namespace string) ([]batchv1beta1.CronJob, error) {
jobs, err := client.BatchV1beta1().CronJobs(namespace).List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("could not list CronJobs: %s", err)
Expand All @@ -32,10 +35,48 @@ func GetCronJobs(namespace string) ([]v1beta1.CronJob, error) {
}

// Return specific CronJob
func GetCronJob(namespace string, name string) (*v1beta1.CronJob, error) {
func GetCronJob(namespace string, name string) (*batchv1beta1.CronJob, error) {
job, err := client.BatchV1beta1().CronJobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("could not get CronJob: %s", err)
}
return job, nil
}

// Create Job from CronJob
func CreateJobFromCronjob(cronJob *batchv1beta1.CronJob) (*batchv1.Job, error) {
// This duplicates the logic used by kubectl to create a Job from a CronJob
annotations := make(map[string]string)
annotations["cronjob.kubernetes.io/instantiate"] = "manual"
for k, v := range cronJob.Spec.JobTemplate.Annotations {
annotations[k] = v
}

jobDef := &batchv1.Job{
TypeMeta: metav1.TypeMeta{APIVersion: batchv1.SchemeGroupVersion.String(), Kind: "Job"},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", cronJob.ObjectMeta.Name, time.Now().Unix()),
Annotations: annotations,
Labels: cronJob.Spec.JobTemplate.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cronJob, appsv1.SchemeGroupVersion.WithKind("CronJob")),
},
},
Spec: cronJob.Spec.JobTemplate.Spec,
}

if job, err := client.BatchV1().Jobs(cronJob.ObjectMeta.Namespace).Create(jobDef); err != nil {
return nil, err
} else {
return job, nil
}
}

// Return specific Job
func GetJob(namespace string, name string) (*batchv1.Job, error) {
job, err := client.BatchV1().Jobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("could not get Job: %s", err)
}
return job, nil
}
59 changes: 45 additions & 14 deletions test/run-test.py
Expand Up @@ -3,30 +3,47 @@
from __future__ import print_function

import json
import re
import requests
import string
import sys
import time

def compare_structures(data1, data2):
PRESERVED_KEYS = {}

# Borrowed from the 'six' library so we don't need it as a dependency
PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
if PY3:
string_types = (str,)
else:
string_types = (basestring,)

def compare_structures(data1, data2, use_regex=False):
if type(data1) != type(data2):
return False
if isinstance(data1, list):
if len(data1) != len(data2):
return False
for idx in range(0, len(data1)):
if not compare_structures(data1[idx], data2[idx]):
if not compare_structures(data1[idx], data2[idx], use_regex=use_regex):
return False
elif isinstance(data2, dict):
if sorted(data1.keys()) != sorted(data2.keys()):
return False
for key in data1:
if not compare_structures(data1[key], data2[key]):
if not compare_structures(data1[key], data2[key], use_regex=use_regex):
return False
else:
if use_regex and isinstance(data1, string_types):
if re.match(data2, data1) is None:
return False
return True
return (data1 == data2)
return True

def perform_request(test_data, url):
global PRESERVED_KEYS
if test_data['method'] == 'GET':
r = requests.get(url)
elif test_data['method'] == 'PUT':
Expand All @@ -49,27 +66,41 @@ def perform_request(test_data, url):
return False
elif 'responseJsonFile' in test_data:
response_data = json.load(open('%s/%s' % (TEST_DIR, test_data['responseJsonFile'])))
if not compare_structures(r.json(), response_data):
if not compare_structures(r.json(), response_data, use_regex=test_data.get('useRegex', False)):
print('Got response JSON:\n\n%s\n\nexpected:\n\n%s' % (r.text, json.dumps(response_data)))
return False
if test_data.get('preserveKeys', False):
PRESERVED_KEYS = r.json()
return True

ENDPOINT = sys.argv[1]
TEST_DIR = sys.argv[2]

test_data = json.load(open('%s/metadata.json' % TEST_DIR))

retries = test_data.get('retries', 0)
if 'steps' in test_data:
steps = test_data['steps']
else:
steps = [test_data]

print('Running test step "%s"...' % test_data['name'])
for step in steps:
retries = step.get('retries', 0)

url = '%s%s' % (ENDPOINT, test_data['urlPath'])
print('Running test step "%s"...' % step['name'])

print('Performing %s request against URL %s' % (test_data['method'], url))
url = '%s%s' % (ENDPOINT, string.Template(step['urlPath']).safe_substitute(PRESERVED_KEYS))

for i in range(retries + 1):
if perform_request(test_data, url):
sys.exit(0)
print("Try %s in %s failed" % (i+1, retries + 1 ))
time.sleep(1)
sys.exit(1)
print('Performing %s request against URL %s' % (step['method'], url))

test_success = False
for i in range(retries + 1):
if perform_request(step, url):
test_success = True
break
print("Attempt %s of %s failed" % (i+1, retries + 1 ))
time.sleep(1)

if test_success:
continue
else:
sys.exit(1)
4 changes: 2 additions & 2 deletions test/run.sh
Expand Up @@ -47,8 +47,8 @@ get_endpoint() {

run_tests() {
get_endpoint
for i in $(ls -1 ${BASEDIR}/steps | sort); do
${BASEDIR}/run-test.py ${ENDPOINT} ${BASEDIR}/steps/${i} || die "test step failed"
for i in $(ls -1 ${BASEDIR}/tests | sort); do
${BASEDIR}/run-test.py ${ENDPOINT} ${BASEDIR}/tests/${i} || die "test step failed"
done
}

Expand Down
File renamed without changes.