Skip to content

Commit

Permalink
Make coordinator independent of docker (#1523)
Browse files Browse the repository at this point in the history
* chore(*): remove unnecessary binary

* docs(*): update stage-execution-result.md

* refactor(resolver): update the result file path

* refactor(co): make coordinator independent of docker

* refactor(workflow): update the volumes mounted in pod

* style(resolver-http): double-quoted vars

* chore(*): make linter happy

* chore(*): add comments

* refactor(*): change ResultDirSubPath to `results`
  • Loading branch information
Jian Zeng committed Nov 18, 2020
1 parent e747dc8 commit 46582f4
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 133 deletions.
8 changes: 5 additions & 3 deletions build/cicd/sonarqube/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ if [ -z ${SOURCE_PATH} ]; then echo "SOURCE_PATH is unset, set it to ./"; SOURCE
# Trim suffix "/" for the server
SERVER=$(echo ${SERVER} | sed -e 's/\/$//')

RESULT_PATH='/cyclone/results/__result__'

# Create project if not exist
status=$(curl -I -u ${TOKEN}: ${SERVER}/api/components/show?component=${PROJECT_KEY} 2>/dev/null | head -n 1 | cut -d$' ' -f2)
if [[ $status == "404" ]]; then
Expand Down Expand Up @@ -120,13 +122,13 @@ done;
echo "Scan task completed~"

# Write result to output file, which will be collected by Cyclone
echo "Collect result to result file /__result__ ..."
echo "detailURL:${SERVER}/dashboard?id=${PROJECT_KEY}" >> /__result__;
echo "Collect result to result file $RESULT_PATH ..."
echo "detailURL:${SERVER}/dashboard?id=${PROJECT_KEY}" >> "$RESULT_PATH";
# Can reference measures result in 'result.example.json' file in current directory
measures=$(curl -XPOST -u ${TOKEN}: "${SERVER}/api/measures/component?additionalFields=periods&component=${PROJECT_KEY}&metricKeys=reliability_rating,sqale_rating,security_rating,coverage,duplicated_lines_density,quality_gate_details" 2>/dev/null)
selected=$(echo $measures | jq -c .component.measures)
echo $selected | jq
echo "measures:${selected}" >> /__result__;
echo "measures:${selected}" >> "$RESULT_PATH"

# Determine success or failure
qualityGateValue=$(echo $selected | jq '.[] | select(.metric=="quality_gate_details") | .value')
Expand Down
9 changes: 5 additions & 4 deletions build/resolver/git/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ if [ "${SCM_TYPE}" = "Bitbucket" ] && [ -z "${SCM_USER}" ]; then echo "WARN: SCM

GIT_DEPTH_OPTION="--depth=1"
GIT_DEPTH_OPTION_DEEPER="--depth=30"
RESULT_PATH='/cyclone/results/__result__'

# If SCM_REPO is provided, embed it to SCM_URL
if [ ! -z "${SCM_REPO}" ]; then
Expand Down Expand Up @@ -151,10 +152,10 @@ wrapPull() {
fi

# Write commit id to output file, which will be collected by Cyclone
cd $WORKDIR/data
echo "Collect commit id to result file /__result__ ..."
echo "LastCommitID:`git log -n 1 --pretty=format:"%H"`" > /__result__;
cat /__result__;
cd "$WORKDIR/data"
echo "Collect commit id to result file $RESULT_PATH ..."
echo "LastCommitID:$(git log -n 1 --pretty=format:"%H")" > "$RESULT_PATH"
cat "$RESULT_PATH"
}

# Revision can be in two different format:
Expand Down
24 changes: 12 additions & 12 deletions build/resolver/http/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ fi
COMMAND=$1

# Check whether environment variables are set.
if [ -z ${WORKDIR} ]; then echo "WORKDIR is unset"; exit 1; fi
if [ -z ${URL} ]; then echo "URL is unset"; exit 1; fi
if [ -z ${METHOD} ]; then echo "METHOD is unset"; exit 1; fi
if [ -z "${WORKDIR}" ]; then echo "WORKDIR is unset"; exit 1; fi
if [ -z "${URL}" ]; then echo "URL is unset"; exit 1; fi
if [ -z "${METHOD}" ]; then echo "METHOD is unset"; exit 1; fi

# replease string '${METADATA_NAMESPACE}' '${WORKFLOW_NAME}' '${STAGE_NAME}' '${WORKFLOWRUN_NAME}' in URL with corresponding real value
URL=${URL//'${METADATA_NAMESPACE}'/${METADATA_NAMESPACE}}
Expand All @@ -69,25 +69,25 @@ URL=${URL//'${STAGE_NAME}'/${STAGE_NAME}}
URL=${URL//'${WORKFLOWRUN_NAME}'/${WORKFLOWRUN_NAME}}

wrapPush() {
if [ -z ${COMPRESS_FILE_NAME} ]; then COMPRESS_FILE_NAME="artifact.tar"; fi
if [ -z ${FORM_FILE_KEY} ]; then FORM_FILE_KEY="file"; fi
if [ -z ${FIND_OPTIONS} ]; then FIND_OPTIONS=". -name '*'"; fi
if [ -z "${COMPRESS_FILE_NAME}" ]; then COMPRESS_FILE_NAME="artifact.tar"; fi
if [ -z "${FORM_FILE_KEY}" ]; then FORM_FILE_KEY="file"; fi
if [ -z "${FIND_OPTIONS}" ]; then FIND_OPTIONS=". -name '*'"; fi

cd ${WORKDIR}/data/${DATA_SUBDIR}
mkdir -p ${WORKDIR}/__output_resources;
cd "${WORKDIR}/data/${DATA_SUBDIR}"
mkdir -p "${WORKDIR}/__output_resources"

echo "Start to find and copy files: find ${FIND_OPTIONS} -exec cp --parents {} ${WORKDIR}/__output_resources \;"
eval "find ${FIND_OPTIONS} -exec cp --parents {} ${WORKDIR}/__output_resources \;"
eval "find ${FIND_OPTIONS} -exec cp -v --parents {} ${WORKDIR}/__output_resources \;"

if [ -z "$(ls -A "${WORKDIR}/__output_resources")" ]; then
echo "No files should be sent, exit."
exit 0
fi

echo "Start to compress files under ${WORKDIR}/__output_resources into file ${COMPRESS_FILE_NAME}"
cd ${WORKDIR}/__output_resources
tar -cvf ${WORKDIR}/${COMPRESS_FILE_NAME} ./*
cd ${WORKDIR}
cd "${WORKDIR}/__output_resources"
tar -cvf "${WORKDIR}/${COMPRESS_FILE_NAME}" ./*
cd "${WORKDIR}"

for header in ${HEADERS}; do
headerString="-H ${header} ${headerString}"
Expand Down
Binary file removed cmd/toolbox/fstream/main
Binary file not shown.
10 changes: 2 additions & 8 deletions cmd/workflow/coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func main() {
}

// Collect execution result from the workload container, results are key-value pairs in a
// specified file, /__result__
// specified file, /workspace/results/*/__result__
if err = c.CollectExecutionResults(); err != nil {
message = fmt.Sprintf("Collect execution results error: %v", err)
return
Expand All @@ -103,13 +103,6 @@ func main() {
return
}

// Collect all resources
log.Info("Start to collect resources.")
if err = c.CollectResources(); err != nil {
message = fmt.Sprintf("Stage %s failed to collect output resource, error: %v.", c.Stage.Name, err)
return
}

// Notify output resolver to start working.
log.Info("Start to notify resolvers.")
if err = c.NotifyResolvers(); err != nil {
Expand All @@ -118,6 +111,7 @@ func main() {
}

// Collect all artifacts
// TODO: remove this stage
log.Info("Start to collect artifacts.")
if err = c.CollectArtifacts(); err != nil {
message = fmt.Sprintf("Stage %s failed to collect artifacts, error: %v", c.Stage.Name, err)
Expand Down
4 changes: 2 additions & 2 deletions docs/stage-execution-result.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ status:

## How It Works

Execution result is collected from result file `/__result__` in workload container. So it you want some results to be saved to WorkflowRun status, you should write them to the result file `/__result__`.
Execution result is collected from result file `/cyclone/results/__result__` in workload container. So it you want some results to be saved to WorkflowRun status, you should write them to the result file `/cyclone/results/__result__`.

The result file is a plain text file with line format `<key>:<value>`. Here is a simple example stage with execution results generated:

Expand All @@ -40,7 +40,7 @@ spec:
command:
- /bin/sh
- -c
- echo "overall:Passed" >> /__result__
- echo "overall:Passed" >> /cyclone/results/__result__
```


19 changes: 17 additions & 2 deletions pkg/workflow/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"fmt"
"path/filepath"
)

// ContainerState represents container state.
Expand Down Expand Up @@ -83,12 +84,16 @@ const (
CoordinatorResolverNotifyOkPath = "/workspace/resolvers/notify/ok"
// CoordinatorArtifactsPath ...
CoordinatorArtifactsPath = "/workspace/artifacts"
// CoordinatorResultsPath is the directory that contains __result__ files written by other containers
CoordinatorResultsPath = "/workspace/results"

// ToolboxPath is path of cyclone tools in containers
ToolboxPath = "/usr/bin/cyclone-toolbox"
// ToolboxVolumeMountPath is mount path of the toolbox emptyDir volume mounted in container
ToolboxVolumeMountPath = "/cyclone-toolbox"

// OutputResourcesVolume is the name of the volume that contains resources generated by users.
OutputResourcesVolume = "output-resources-volume"
// DefaultPvVolumeName is name of the default PV used by all workflow stages.
DefaultPvVolumeName = "default-pv"
// ToolsVolume is name of the volume to inject cyclone tools to containers.
Expand All @@ -113,9 +118,14 @@ const (
// ContainerStateInitialized represents container is Running or Stopped, not Init or Creating.
ContainerStateInitialized ContainerState = "Initialized"

// ResultFilePath is file to hold execution result of a container that need to be synced to
// ResultFileDir contains the file `__result__` to hold execution result of a container that need to be synced to
// WorkflowRun status. Each line of the result should be in format: <key>:<value>
ResultFilePath = "/__result__"
ResultFileDir = "/cyclone/results"
// ResultDirSubPath defines the subPath of ResultFileDir in coordinator sidecar volume
ResultDirSubPath = "results"

// OutputResourcesDir contains the output resources generated by workload container
OutputResourcesDir = "/cyclone/resources"

// DefaultServiceAccountName is service account name used by stage pod
DefaultServiceAccountName = "cyclone"
Expand All @@ -142,3 +152,8 @@ func OutputResourceVolumeName(name string) string {
func PresetVolumeName(index int) string {
return fmt.Sprintf("preset-%d", index)
}

// ResultSubPath returns the subPath in the volume according to the containerName.
func ResultSubPath(containerName string) string {
return filepath.Join(ResultDirSubPath, containerName)
}
2 changes: 1 addition & 1 deletion pkg/workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func LoadConfig(cm *corev1.ConfigMap) error {
}

InitLogger(&Config.Logging)
log.Info("ResyncPeriod is %s", Config.ResyncPeriodSeconds*time.Second)
log.Infof("ResyncPeriod is %s", Config.ResyncPeriodSeconds*time.Second)
return nil
}

Expand Down
105 changes: 14 additions & 91 deletions pkg/workflow/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -251,57 +252,6 @@ func (co *Coordinator) CollectArtifacts() error {
return nil
}

// CollectResources collects workload resources.
func (co *Coordinator) CollectResources() error {
if co.Stage.Spec.Pod == nil {
return fmt.Errorf("get stage output resources failed, stage pod nil")
}

resources := co.Stage.Spec.Pod.Outputs.Resources
if len(resources) == 0 {
log.Info("output resources empty, no need to collect.")
return nil
}

log.WithField("resources", resources).Info("start to collect.")

// Create the resources directory if not exist.
fileutil.CreateDirectory(common.CoordinatorResourcesPath)

for _, resource := range resources {
for _, r := range co.OutputResources {
if r.Name == resource.Name {
// If the resource is persisted in PVC, no need to copy here, Cyclone
// will mount it to resolver container directly.
if r.Spec.Persistent != nil {
continue
}
}
}

if len(resource.Path) == 0 {
continue
}

dst := path.Join(common.CoordinatorResourcesPath, resource.Name)
fileutil.CreateDirectory(dst)

id, err := co.getContainerID(co.workloadContainer)
if err != nil {
log.Errorf("get container %s's id failed: %v", co.workloadContainer, err)
return err
}

err = co.runtimeExec.CopyFromContainer(id, resource.Path, dst)
if err != nil {
log.Errorf("Copy container %s resources %s failed: %v", co.workloadContainer, resource.Name, err)
return err
}
}

return nil
}

// NotifyResolvers create a file to notify output resolvers to start working.
func (co *Coordinator) NotifyResolvers() error {
if co.Stage.Spec.Pod == nil {
Expand Down Expand Up @@ -365,29 +315,24 @@ func (co *Coordinator) getContainerID(name string) (string, error) {

// CollectExecutionResults collects execution results (key-values) and store them in pod's annotation
func (co *Coordinator) CollectExecutionResults() error {
pod, err := co.runtimeExec.GetPod()
if err != nil {
return err
}

var keyValues []v1alpha1.KeyValue

for _, c := range pod.Spec.Containers {
kv, err := co.extractExecutionResults(c.Name)
err := filepath.Walk(common.CoordinatorResultsPath, func(fp string, info os.FileInfo, err error) error {
if err != nil {
continue
return err
}
if info.Name() != "__result__" {
return nil
}

keyValues = append(keyValues, kv...)
}

for _, c := range pod.Spec.InitContainers {
kv, err := co.extractExecutionResults(c.Name)
kvs, err := readKeyValuesFromFile(fp)
if err != nil {
continue
return err
}

keyValues = append(keyValues, kv...)
keyValues = append(keyValues, kvs...)
return nil
})
if err != nil {
return err
}

if len(keyValues) > 0 {
Expand All @@ -400,30 +345,8 @@ func (co *Coordinator) CollectExecutionResults() error {
return nil
}

func isFileNotExist(err error) bool {
if err == nil {
return false
}

return strings.Contains(err.Error(), "No such container:path")
}

func (co *Coordinator) extractExecutionResults(containerName string) ([]v1alpha1.KeyValue, error) {
func readKeyValuesFromFile(dst string) ([]v1alpha1.KeyValue, error) {
var keyValues []v1alpha1.KeyValue
dst := fmt.Sprintf("/tmp/__result__%s", containerName)
containerID, err := co.getContainerID(containerName)
if err != nil {
log.WithField("c", containerID).Error("Get container ID error: ", err)
return keyValues, err
}
err = co.runtimeExec.CopyFromContainer(containerID, common.ResultFilePath, dst)
if isFileNotExist(err) {
return keyValues, err
}

if err != nil {
return keyValues, err
}

b, err := ioutil.ReadFile(dst)
if err != nil {
Expand Down

0 comments on commit 46582f4

Please sign in to comment.