Skip to content

Commit

Permalink
Michael's feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
gfichtenholt committed Jul 15, 2021
1 parent dc207be commit 1df6ec4
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 108 deletions.
104 changes: 27 additions & 77 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package main
import (
"context"
"fmt"
"math"
"os"
"strconv"
"sync"
Expand Down Expand Up @@ -125,34 +124,38 @@ func newCacheWithRedisClient(config cacheConfig, redisCli *redis.Client) (*Resou

c := ResourceWatcherCache{
config: config,
redisCli: redisCli,
initOk: false,
initMutex: sync.Mutex{},
redisCli: redisCli,
}

c.initMutex.Lock()
go c.startResourceWatcher()

return &c, nil
}

func (c *ResourceWatcherCache) startResourceWatcher() {
log.Infof("+ResourceWatcherCache startResourceWatcher")
// can't defer c.watcherMutex.Unlock() because when all is well,
// can't defer c.initMutex.Unlock() because when all is well,
// we never return from this func

if !c.initOk {
ch, err := c.newResourceWatcherChan()
if err != nil {
for {
ch, err := c.newResourceWatcherChan()
if err != nil {
c.initMutex.Unlock()
log.Errorf("Failed to start resource watcher for [%s] due to: %v", c.config.gvr, err)
return
}
c.initOk = true
c.initMutex.Unlock()
log.Errorf("Failed to start resource watcher for [%s] due to: %v", c.config.gvr, err)
return
}
c.initOk = true
c.initMutex.Unlock()
log.Infof("Watcher for [%s] successfully started. waiting for events...", c.config.gvr)
log.Infof("Watcher for [%s] successfully started. waiting for events...", c.config.gvr)

c.processEvents(ch)
c.processEvents(ch)
// if we get here the watch needs to be re-started
c.initMutex.Lock()
c.initOk = false
}
} else {
c.initMutex.Unlock()
log.Infof("watcher already started. exiting...")
Expand Down Expand Up @@ -187,7 +190,12 @@ func (c *ResourceWatcherCache) newResourceWatcherChan() (<-chan watch.Event, err
// this is an infinite loop that waits for new events and processes them when they happen
func (c *ResourceWatcherCache) processEvents(ch <-chan watch.Event) {
for {
event := <-ch
event, ok := <-ch
if !ok {
log.Errorf("Channel already closed. Will attempt to restart the watcher")
// this may happen and we will need to restart the watcher
return
}
if event.Type == "" {
// not quite sure why this happens (the docs don't say), but it seems to happen quite often
continue
Expand Down Expand Up @@ -320,21 +328,6 @@ func (c *ResourceWatcherCache) fetchForOne(key string) (interface{}, error) {
return val, nil
}

const (
// max number of concurrent workers reading results for fetch() at the same time
maxWorkers = 10
)

type fetchValueJob struct {
key string
}

type fetchValueJobResult struct {
key string
value interface{}
err error
}

// return all keys, optionally matching a given filter (repository list)
func (c *ResourceWatcherCache) listKeys(filters []string) ([]string, error) {
if err := c.checkInit(); err != nil {
Expand Down Expand Up @@ -384,61 +377,18 @@ func (c *ResourceWatcherCache) listKeys(filters []string) ([]string, error) {
return resultKeys, nil
}

// each object is read from redis in a separate go routine (lightweight thread of execution)
// returns a map with same keys as input and corresponding values out of the cache
//
// TODO (gfichtenholt) this func originally was written such that it was doing all the heavy work,
// like indexing a repo for each key when asked, hence the use of concurrent go routines.
// Now all it does is fetch pre-computed values out of the cache, which by definition should be
// very quick, so I am not sure warrants the complexity below.
// Perhaps, I need to simplify it and fetch everything in a single sequence in a for loop
func (c *ResourceWatcherCache) fetchForMultiple(keys []string) (map[string]interface{}, error) {
if err := c.checkInit(); err != nil {
return nil, err
}

response := make(map[string]interface{})
var wg sync.WaitGroup
numWorkers := int(math.Min(float64(len(keys)), float64(maxWorkers)))
requestChan := make(chan fetchValueJob, numWorkers)
responseChan := make(chan fetchValueJobResult, numWorkers)

// Process only at most maxWorkers at a time
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
for job := range requestChan {
// The following loop will only terminate when the request channel is closed (and there are no more items)
result, err := c.fetchForOne(job.key)
responseChan <- fetchValueJobResult{job.key, result, err}
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(responseChan)
}()

go func() {
for _, key := range keys {
requestChan <- fetchValueJob{key}
}
close(requestChan)
}()

// Start receiving results
// The following loop will only terminate when the response channel is closed, i.e.
// after the all the requests have been processed
for resp := range responseChan {
if resp.err == nil {
// resp.result may be nil when there is a cache miss
if resp.value != nil {
response[resp.key] = resp.value
}
} else {
log.Errorf("fetch value for key [%s] failed due to %v", resp.key, resp.err)
for _, key := range keys {
result, err := c.fetchForOne(key)
if err != nil {
return nil, err
}
response[key] = result
}
return response, nil
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,19 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
return nil, err
}

responsePackages, err := getPaginatedSummariesWithFilters(int(pageSize), pageOffset, cachedCharts, request.GetFilterOptions())
packageSummaries, err := filterAndPaginateChartsAsSummaries(request.GetFilterOptions(), int(pageSize), pageOffset, cachedCharts)
if err != nil {
return nil, err
}

// Only return a next page token if the request was for pagination and
// the results are a full page.
nextPageToken := ""
if pageSize > 0 && len(responsePackages) == int(pageSize) {
if pageSize > 0 && len(packageSummaries) == int(pageSize) {
nextPageToken = fmt.Sprintf("%d", pageOffset+1)
}
return &corev1.GetAvailablePackageSummariesResponse{
AvailablePackagesSummaries: responsePackages,
AvailablePackagesSummaries: packageSummaries,
NextPageToken: nextPageToken,
}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
}
}

func TestGetAvailablePackageSummariesAfterRepoIndexUpdate(t *testing.T) {
func TestGetAvailablePackageSummaryAfterRepoIndexUpdate(t *testing.T) {
t.Run("test get available package summaries after repo index is updated", func(t *testing.T) {
indexYamlBeforeUpdateBytes, err := ioutil.ReadFile("testdata/index-before-update.yaml")
if err != nil {
Expand Down Expand Up @@ -948,7 +948,7 @@ func TestGetAvailablePackageSummariesAfterRepoIndexUpdate(t *testing.T) {
})
}

func TestGetAvailablePackageSummariesAfterFluxHelmRepoDelete(t *testing.T) {
func TestGetAvailablePackageSummaryAfterFluxHelmRepoDelete(t *testing.T) {
t.Run("test get available package summaries after flux helm repository CRD gets deleted", func(t *testing.T) {
indexYaml, err := ioutil.ReadFile("testdata/valid-index.yaml")
if err != nil {
Expand Down Expand Up @@ -1489,7 +1489,7 @@ func newServerWithWatcher(repos ...runtime.Object) (*Server, redismock.ClientMoc

// this is so we can emulate actual k8s server firing events
// see https://github.com/kubernetes/kubernetes/issues/54075 for explanation
watcher := watch.NewFake()
watcher := watch.NewFakeWithChanSize(len(repos), true)

dynamicClient.Fake.PrependWatchReactor(
"*",
Expand Down
51 changes: 26 additions & 25 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,49 +240,50 @@ func pageOffsetFromPageToken(pageToken string) (int, error) {
return int(offset), nil
}

func getPaginatedSummariesWithFilters(pageSize, pageOffset int, cachedCharts map[string]interface{}, filters *corev1.FilterOptions) ([]*corev1.AvailablePackageSummary, error) {
func filterAndPaginateChartsAsSummaries(filters *corev1.FilterOptions, pageSize, pageOffset int, cachedCharts map[string]interface{}) ([]*corev1.AvailablePackageSummary, error) {
// this loop is here for 3 reasons:
// 1) to convert from []interface{} which is what the generic cache implementation
// returns for cache hits to a typed array object.
// 2) perform any filtering of the results as needed, pending redis support for
// querying values stored in cache (see discussion in https://github.com/kubeapps/kubeapps/issues/3032)
// 3) if pagination was requested, only return up to one page size of results
responsePackages := make([]*corev1.AvailablePackageSummary, 0)
summaries := make([]*corev1.AvailablePackageSummary, 0)
i := 0
startAt := -1
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
}
for _, packages := range cachedCharts {
if packages != nil {
typedCharts, ok := packages.([]chart.Chart)
if !ok {
return nil, status.Errorf(
codes.Internal,
"Unexpected value fetched from cache: %v", packages)
} else {
for _, chart := range typedCharts {
if passesFilter(chart, filters) {
i++
if startAt < 0 || startAt < i {
pkg, err := availablePackageSummaryFromChart(&chart)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"Unable to parse chart to an AvailablePackageSummary: %v",
err)
}
responsePackages = append(responsePackages, pkg)
if pageSize > 0 && len(responsePackages) == int(pageSize) {
return responsePackages, nil
}
if packages == nil {
continue
}
typedCharts, ok := packages.([]chart.Chart)
if !ok {
return nil, status.Errorf(
codes.Internal,
"Unexpected value fetched from cache: %v", packages)
} else {
for _, chart := range typedCharts {
if passesFilter(chart, filters) {
i++
if startAt < i {
pkg, err := availablePackageSummaryFromChart(&chart)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"Unable to parse chart to an AvailablePackageSummary: %v",
err)
}
summaries = append(summaries, pkg)
if pageSize > 0 && len(summaries) == int(pageSize) {
return summaries, nil
}
}
}
}
}
}
return responsePackages, nil
return summaries, nil
}

// implements plug-in specific cache-related functionality
Expand Down

0 comments on commit 1df6ec4

Please sign in to comment.