-
Notifications
You must be signed in to change notification settings - Fork 569
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
23 changed files
with
621 additions
and
1,897 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
115 changes: 28 additions & 87 deletions
115
deepfence_agent/tools/apache/scope/app/cache_topology.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,112 +1,53 @@ | ||
package app | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"github.com/gomodule/redigo/redis" | ||
log "github.com/sirupsen/logrus" | ||
"github.com/ugorji/go/codec" | ||
redisCache "github.com/weaveworks/scope/cache" | ||
"github.com/weaveworks/scope/render" | ||
"github.com/weaveworks/scope/render/detailed" | ||
"github.com/weaveworks/scope/report" | ||
"os" | ||
"strconv" | ||
"time" | ||
) | ||
|
||
var ( | ||
redisExpiryTime = 180 // 3 minutes | ||
) | ||
|
||
func cacheInRedis(redisPool *redis.Pool, topologyID string, reporter Reporter, rpt *report.Report) { | ||
ctx := context.Background() | ||
values := make(map[string][]string) | ||
renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, values, *rpt) | ||
if err != nil { | ||
return | ||
} | ||
rc := RenderContextForReporter(reporter, *rpt) | ||
nodeSummaries := detailed.Summaries(ctx, rc, render.Render(ctx, rc.Report, renderer, filter).Nodes, true) | ||
redisConn := redisPool.Get() | ||
defer redisConn.Close() | ||
|
||
buf := &bytes.Buffer{} | ||
err = codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(APITopology{Nodes: nodeSummaries}) | ||
|
||
redisKey := "topology_" + topologyID | ||
_, err = redisConn.Do("SETEX", redisKey, redisExpiryTime, string(buf.Bytes())) | ||
if err != nil { | ||
log.Printf("Error: SETEX %s: %v\n", redisKey, err) | ||
} | ||
} | ||
|
||
func cache(redisPool *redis.Pool, reporter Reporter) error { | ||
ctx := context.Background() | ||
rep, err := reporter.Report(ctx, time.Now()) | ||
if err != nil { | ||
return err | ||
} | ||
go cacheInRedis(redisPool, hostsID, reporter, &rep) | ||
go cacheInRedis(redisPool, containersID, reporter, &rep) | ||
go cacheInRedis(redisPool, containersByImageID, reporter, &rep) | ||
go cacheInRedis(redisPool, processesID, reporter, &rep) | ||
go cacheInRedis(redisPool, podsID, reporter, &rep) | ||
go cacheInRedis(redisPool, kubeControllersID, reporter, &rep) | ||
go cacheInRedis(redisPool, servicesID, reporter, &rep) | ||
return nil | ||
} | ||
|
||
func CacheTopology(reporter Reporter) { | ||
redisPool, _ := newRedisPool() | ||
|
||
ctx := context.Background() | ||
wait := make(chan struct{}, 1) | ||
reporter.WaitOn(ctx, wait) | ||
defer reporter.UnWait(ctx, wait) | ||
ticker := time.NewTicker(10 * time.Second) | ||
defer ticker.Stop() | ||
|
||
var rep report.Report | ||
var err error | ||
values := make(map[string][]string) | ||
|
||
topologies := map[string]*redisCache.RedisCache{ | ||
hostsID: redisCache.NewRedisCache(hostsID), | ||
containersID: redisCache.NewRedisCache(containersID), | ||
containersByImageID: redisCache.NewRedisCache(containersByImageID), | ||
processesID: redisCache.NewRedisCache(processesID), | ||
podsID: redisCache.NewRedisCache(podsID), | ||
kubeControllersID: redisCache.NewRedisCache(kubeControllersID), | ||
servicesID: redisCache.NewRedisCache(servicesID), | ||
} | ||
|
||
for { | ||
select { | ||
case <-ticker.C: | ||
err := cache(redisPool, reporter) | ||
rep, err = reporter.Report(ctx, time.Now()) | ||
if err != nil { | ||
log.Error(err) | ||
continue | ||
} | ||
for topologyID, r := range topologies { | ||
go func(topologyID string, r *redisCache.RedisCache) { | ||
renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, values, rep) | ||
if err != nil { | ||
return | ||
} | ||
rc := RenderContextForReporter(reporter, rep) | ||
r.Update(detailed.Summaries(ctx, rc, render.Render(ctx, rc.Report, renderer, filter).Nodes, true)) | ||
}(topologyID, r) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func newRedisPool() (*redis.Pool, int) { | ||
var dbNumInt int | ||
var errVal error | ||
dbNumStr := os.Getenv("REDIS_DB_NUMBER") | ||
if dbNumStr == "" { | ||
dbNumInt = 0 | ||
} else { | ||
dbNumInt, errVal = strconv.Atoi(dbNumStr) | ||
if errVal != nil { | ||
dbNumInt = 0 | ||
} | ||
} | ||
redisHost := os.Getenv("REDIS_HOST") | ||
if redisHost == "" { | ||
redisHost = "deepfence-redis" | ||
} | ||
redisPort := os.Getenv("REDIS_PORT") | ||
if redisPort == "" { | ||
redisPort = "6379" | ||
} | ||
redisAddr := fmt.Sprintf("%s:%s", redisHost, redisPort) | ||
return &redis.Pool{ | ||
MaxIdle: 10, | ||
MaxActive: 30, // max number of connections | ||
Dial: func() (redis.Conn, error) { | ||
c, err := redis.Dial("tcp", redisAddr, redis.DialDatabase(dbNumInt)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return c, err | ||
}, | ||
}, dbNumInt | ||
} |
Oops, something went wrong.