Skip to content

Commit

Permalink
kubelet/rkt: Add RunPod() for rkt.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yifan Gu committed Apr 30, 2015
1 parent 2c381a6 commit f5a65c3
Showing 1 changed file with 151 additions and 0 deletions.
151 changes: 151 additions & 0 deletions pkg/kubelet/rkt/rkt.go
Expand Up @@ -17,9 +17,17 @@ limitations under the License.
package rkt

import (
"encoding/json"
"fmt"
"hash/adler32"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
"strings"
"syscall"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
"github.com/coreos/go-systemd/dbus"
Expand Down Expand Up @@ -138,3 +146,146 @@ func (r *Runtime) runCommand(args ...string) ([]string, error) {
}
return strings.Split(strings.TrimSpace(string(output)), "\n"), nil
}

func newUnitOption(section, name, value string) *unit.UnitOption {
return &unit.UnitOption{Section: section, Name: name, Value: value}
}

// TODO(yifan): Move this duplicated function to container runtime.
// hashContainer computes the hash of one api.Container.
func hashContainer(container *api.Container) uint64 {
hash := adler32.New()
util.DeepHashObject(hash, *container)
return uint64(hash.Sum32())
}

// TODO(yifan): Remove the receiver once we can solve the appName->imageID problem.
func (r *Runtime) apiPodToRuntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod {
p := &kubecontainer.Pod{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
}
for i := range pod.Spec.Containers {
c := &pod.Spec.Containers[i]
imageID, err := r.getImageID(c.Image)
if err != nil {
glog.Warningf("rkt: Cannot get image id: %v", err)
}
p.Containers = append(p.Containers, &kubecontainer.Container{
ID: types.UID(buildContainerID(&containerID{uuid, c.Name, imageID})),
Name: c.Name,
Image: c.Image,
Hash: hashContainer(c),
Created: time.Now().Unix(),
})
}
return p
}

// preparePod will:
//
// 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid.
// 2. Creates the unit file and save it under systemdUnitDir.
//
// On success, it will return a string that represents name of the unit file
// and a boolean that indicates if the unit file needs to be reloaded (whether
// the file is already existed).
func (r *Runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (string, bool, error) {
cmds := []string{"prepare", "--quiet", "--pod-manifest"}

// Generate the pod manifest from the pod spec.
manifest, err := r.makePodManifest(pod, volumeMap)
if err != nil {
return "", false, err
}
manifestFile, err := ioutil.TempFile("", "manifest")
if err != nil {
return "", false, err
}
defer func() {
manifestFile.Close()
if err := os.Remove(manifestFile.Name()); err != nil {
glog.Warningf("rkt: Cannot remove temp manifest file %q: %v", manifestFile.Name(), err)
}
}()

data, err := json.Marshal(manifest)
if err != nil {
return "", false, err
}
// Since File.Write returns error if the written length is less than len(data),
// so check error is enough for us.
if _, err := manifestFile.Write(data); err != nil {
return "", false, err
}

cmds = append(cmds, manifestFile.Name())
output, err := r.runCommand(cmds...)
if err != nil {
return "", false, err
}
if len(output) != 1 {
return "", false, fmt.Errorf("cannot get uuid from 'rkt prepare'")
}
uuid := output[0]
glog.V(4).Infof("'rkt prepare' returns %q.", uuid)

p := r.apiPodToRuntimePod(uuid, pod)
b, err := json.Marshal(p)
if err != nil {
return "", false, err
}

runPrepared := fmt.Sprintf("%s run-prepared --private-net=%v %s", r.absPath, pod.Spec.HostNetwork, uuid)
units := []*unit.UnitOption{
newUnitOption(unitKubernetesSection, unitRktID, uuid),
newUnitOption(unitKubernetesSection, unitPodName, string(b)),
newUnitOption("Service", "ExecStart", runPrepared),
}

// Save the unit file under systemd's service directory.
// TODO(yifan) Garbage collect 'dead' serivce files.
needReload := false
unitName := makePodServiceFileName(pod.UID)
if _, err := os.Stat(path.Join(systemdServiceDir, unitName)); err == nil {
needReload = true
}
unitFile, err := os.Create(path.Join(systemdServiceDir, unitName))
if err != nil {
return "", false, err
}
defer unitFile.Close()

_, err = io.Copy(unitFile, unit.Serialize(units))
if err != nil {
return "", false, err
}
return unitName, needReload, nil
}

// RunPod first creates the unit file for a pod, and then calls
// StartUnit over d-bus.
func (r *Runtime) RunPod(pod *api.Pod, volumeMap map[string]volume.Volume) error {
glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name)

name, needReload, err := r.preparePod(pod, volumeMap)
if err != nil {
return err
}
if needReload {
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
r.systemd.KillUnit(name, int32(syscall.SIGKILL))
if err := r.systemd.Reload(); err != nil {
return err
}
}

// TODO(yifan): This is the old version of go-systemd. Should update when libcontainer updates
// its version of go-systemd.
_, err = r.systemd.StartUnit(name, "replace")
if err != nil {
return err
}
return nil
}

0 comments on commit f5a65c3

Please sign in to comment.