Skip to content

Commit

Permalink
anawareness runtime prefetch
Browse files Browse the repository at this point in the history
1. add prefetchlist store prefetchlist storage service.
2. Modify the optimizer to publish the access file list as a prefetchlist to the storage service when obtaining it.
  • Loading branch information
billie60 committed Jan 5, 2024
1 parent 47d4311 commit 059d595
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 2 deletions.
98 changes: 97 additions & 1 deletion cmd/optimizer-nri-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/syslog"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -43,6 +46,8 @@ type PluginConfig struct {
Readable bool `toml:"readable"`
Timeout int `toml:"timeout"`
Overwrite bool `toml:"overwrite"`
Domain string `toml:"domain"`
Port string `toml:"port"`
}

type PluginArgs struct {
Expand Down Expand Up @@ -104,6 +109,16 @@ func buildFlags(args *PluginArgs) []cli.Flag {
Usage: "whether to overwrite the existed persistent files",
Destination: &args.Config.Overwrite,
},
&cli.StringFlag{
Name: "domain",
Usage: "domain of prefetchlist store service",
Destination: &args.Config.Domain,
},
&cli.StringFlag{
Name: "port",
Usage: "port of prefetchlist store service",
Destination: &args.Config.Port,
},
}
}

Expand All @@ -129,9 +144,12 @@ var (
)

const (
imageNameLabel = "io.kubernetes.cri.image-name"
imageNameLabel = "io.kubernetes.cri.image-name"
containerNameLabel = "io.kubernetes.cri.container-name"
)

const defaultEndpoint = "/api/v1/prefetch/upload"

func (p *plugin) Configure(config, runtime, version string) (stub.EventMask, error) {
log.Infof("got configuration data: %q from runtime %s %s", config, runtime, version)
if config == "" {
Expand All @@ -156,11 +174,26 @@ func (p *plugin) Configure(config, runtime, version string) (stub.EventMask, err
return p.mask, nil
}

type PrefetchFile struct {
Path string
}

type CacheItem struct {
ImageName string
ContainerName string
PrefetchFiles []PrefetchFile
}

type Cache struct {
Items map[string]*CacheItem
}

func (p *plugin) StartContainer(_ *api.PodSandbox, container *api.Container) error {
dir, imageName, err := GetImageName(container.Annotations)
if err != nil {
return err
}
containerName := container.Annotations[containerNameLabel]

persistDir := filepath.Join(cfg.PersistDir, dir)
if err := os.MkdirAll(persistDir, os.ModePerm); err != nil {
Expand All @@ -178,11 +211,74 @@ func (p *plugin) StartContainer(_ *api.PodSandbox, container *api.Container) err
return err
}

imageRepo := container.Annotations[imageNameLabel]
serverURL := fmt.Sprintf("%s:%s", cfg.Domain, cfg.Port)

go func(imageRepo, persistFile, containerName, serverURL string) {
time.Sleep(1 * time.Minute)
if err := sendToServer(imageRepo, persistFile, containerName, serverURL); err != nil {
log.WithError(err).Error("failed to send prefetch to http server")
}
}(imageRepo, persistFile, containerName, serverURL)

globalFanotifyServer[imageName] = fanotifyServer

return nil
}

func sendToServer(imageName, prefetchlistPath, containerName, serverURL string) error {
data, err := os.ReadFile(prefetchlistPath)
if err != nil {
return fmt.Errorf("error reading file: %w", err)
}

filePaths := strings.Split(string(data), "\n")

var prefetchFiles []PrefetchFile
for _, path := range filePaths {
if path != "" {
prefetchFiles = append(prefetchFiles, PrefetchFile{Path: path})
}
}

item := CacheItem{
ImageName: imageName,
ContainerName: containerName,
PrefetchFiles: prefetchFiles,
}

url := fmt.Sprintf("http://%s%s", serverURL, defaultEndpoint)

err = postRequest(item, url)
if err != nil {
return fmt.Errorf("error uploading to server: %w", err)
}

return nil
}

func postRequest(item CacheItem, endpoint string) error {
data, err := json.Marshal(item)
if err != nil {
return err
}

resp, err := http.Post(endpoint, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}

fmt.Println("Server Response:", string(body))

return nil
}

func (p *plugin) StopContainer(_ *api.PodSandbox, container *api.Container) ([]*api.ContainerUpdate, error) {
var update = []*api.ContainerUpdate{}
_, imageName, err := GetImageName(container.Annotations)
Expand Down
6 changes: 5 additions & 1 deletion misc/example/optimizer-nri-plugin.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ timeout = 0
overwrite = false
# The events that containerd subscribes to.
# Do not change this element.
events = [ "StartContainer", "StopContainer" ]
events = [ "StartContainer", "StopContainer" ]
# The domain and port of prefetchlist store service
# ip mappings should be established in advance
domain = "prefetchlist.store.local"
port = "1323"
96 changes: 96 additions & 0 deletions tools/prefetchlistStoreService/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2023. Nydus Developers. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
package main

import (
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
)

type PrefetchFile struct {
Path string
}

type CacheItem struct {
ImageName string
ContainerName string
PrefetchFiles []PrefetchFile
}

type Cache struct {
Items map[string]*CacheItem
mutex sync.Mutex
}

func (cache *Cache) key(imageName, containerName string) string {
return fmt.Sprintf("%s,%s", imageName, containerName)
}

func (cache *Cache) Get(imageName, containerName string) *CacheItem {
cache.mutex.Lock()
defer cache.mutex.Unlock()

return cache.Items[cache.key(imageName, containerName)]
}

func (cache *Cache) Set(item CacheItem) {
cache.mutex.Lock()
defer cache.mutex.Unlock()

cache.Items[cache.key(item.ImageName, item.ContainerName)] = &item
}

var serverCache Cache

func uploadHandler(w http.ResponseWriter, r *http.Request) {
var item CacheItem
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
err = json.Unmarshal(body, &item)
if err != nil {
http.Error(w, "Invalid request payload", http.StatusBadRequest)
return
}

serverCache.Set(item)
fmt.Fprintf(w, "Uploaded CacheItem for %s, %s successfully", item.ImageName, item.ContainerName)
}

func downloadHandler(w http.ResponseWriter, r *http.Request) {
imageName := r.URL.Query().Get("imageName")
containerName := r.URL.Query().Get("containerName")

item := serverCache.Get(imageName, containerName)
if item == nil {
http.Error(w, "CacheItem not found", http.StatusNotFound)
return
}

err := json.NewEncoder(w).Encode(item)
if err != nil {
http.Error(w, "Failed to encode CacheItem to JSON", http.StatusInternalServerError)
return
}
}

func main() {
serverCache = Cache{Items: make(map[string]*CacheItem)}

http.HandleFunc("/api/v1/prefetch/upload", uploadHandler)
http.HandleFunc("/api/v1/prefetch/download", downloadHandler)

fmt.Println("Server started on :1323")
err := http.ListenAndServe(":1323", nil)
if err != nil {
fmt.Printf("Failed to start server: %v\n", err)
}
}

0 comments on commit 059d595

Please sign in to comment.