diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index c8bc2061e740..840d5edba4af 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -19,17 +19,27 @@ package rkt import ( "encoding/json" "fmt" + "hash/adler32" + "io" + "io/ioutil" + "os" "os/exec" + "path" "strings" + "syscall" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" + kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" appcschema "github.com/appc/spec/schema" appctypes "github.com/appc/spec/schema/types" "github.com/coreos/go-systemd/dbus" + "github.com/coreos/go-systemd/unit" "github.com/coreos/rkt/store" "github.com/golang/glog" ) @@ -382,3 +392,146 @@ func (r *Runtime) getImageID(imageName string) (string, error) { } return last, nil } + +func newUnitOption(section, name, value string) *unit.UnitOption { + return &unit.UnitOption{Section: section, Name: name, Value: value} +} + +// TODO(yifan): Move this duplicated function to container runtime. +// hashContainer computes the hash of one api.Container. +func hashContainer(container *api.Container) uint64 { + hash := adler32.New() + util.DeepHashObject(hash, *container) + return uint64(hash.Sum32()) +} + +// TODO(yifan): Remove the receiver once we can solve the appName->imageID problem. +func (r *Runtime) apiPodToRuntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod { + p := &kubecontainer.Pod{ + ID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + } + for i := range pod.Spec.Containers { + c := &pod.Spec.Containers[i] + imageID, err := r.getImageID(c.Image) + if err != nil { + glog.Warningf("rkt: Cannot get image id: %v", err) + } + p.Containers = append(p.Containers, &kubecontainer.Container{ + ID: types.UID(buildContainerID(&containerID{uuid, c.Name, imageID})), + Name: c.Name, + Image: c.Image, + Hash: hashContainer(c), + Created: time.Now().Unix(), + }) + } + return p +} + +// preparePod will: +// +// 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid. +// 2. Creates the unit file and save it under systemdUnitDir. +// +// On success, it will return a string that represents name of the unit file +// and a boolean that indicates if the unit file needs to be reloaded (whether +// the file is already existed). +func (r *Runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (string, bool, error) { + cmds := []string{"prepare", "--quiet", "--pod-manifest"} + + // Generate the pod manifest from the pod spec. + manifest, err := r.makePodManifest(pod, volumeMap) + if err != nil { + return "", false, err + } + manifestFile, err := ioutil.TempFile("", "manifest") + if err != nil { + return "", false, err + } + defer func() { + manifestFile.Close() + if err := os.Remove(manifestFile.Name()); err != nil { + glog.Warningf("rkt: Cannot remove temp manifest file %q: %v", manifestFile.Name(), err) + } + }() + + data, err := json.Marshal(manifest) + if err != nil { + return "", false, err + } + // Since File.Write returns error if the written length is less than len(data), + // so check error is enough for us. + if _, err := manifestFile.Write(data); err != nil { + return "", false, err + } + + cmds = append(cmds, manifestFile.Name()) + output, err := r.runCommand(cmds...) + if err != nil { + return "", false, err + } + if len(output) != 1 { + return "", false, fmt.Errorf("cannot get uuid from 'rkt prepare'") + } + uuid := output[0] + glog.V(4).Infof("'rkt prepare' returns %q.", uuid) + + p := r.apiPodToRuntimePod(uuid, pod) + b, err := json.Marshal(p) + if err != nil { + return "", false, err + } + + runPrepared := fmt.Sprintf("%s run-prepared --private-net=%v %s", r.absPath, pod.Spec.HostNetwork, uuid) + units := []*unit.UnitOption{ + newUnitOption(unitKubernetesSection, unitRktID, uuid), + newUnitOption(unitKubernetesSection, unitPodName, string(b)), + newUnitOption("Service", "ExecStart", runPrepared), + } + + // Save the unit file under systemd's service directory. + // TODO(yifan) Garbage collect 'dead' service files. + needReload := false + unitName := makePodServiceFileName(pod.UID) + if _, err := os.Stat(path.Join(systemdServiceDir, unitName)); err == nil { + needReload = true + } + unitFile, err := os.Create(path.Join(systemdServiceDir, unitName)) + if err != nil { + return "", false, err + } + defer unitFile.Close() + + _, err = io.Copy(unitFile, unit.Serialize(units)) + if err != nil { + return "", false, err + } + return unitName, needReload, nil +} + +// RunPod first creates the unit file for a pod, and then calls +// StartUnit over d-bus. +func (r *Runtime) RunPod(pod *api.Pod, volumeMap map[string]volume.Volume) error { + glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name) + + name, needReload, err := r.preparePod(pod, volumeMap) + if err != nil { + return err + } + if needReload { + // TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout. + r.systemd.KillUnit(name, int32(syscall.SIGKILL)) + if err := r.systemd.Reload(); err != nil { + return err + } + } + + // TODO(yifan): This is the old version of go-systemd. Should update when libcontainer updates + // its version of go-systemd. + _, err = r.systemd.StartUnit(name, "replace") + if err != nil { + return err + } + return nil +}