Skip to content

Commit

Permalink
Support logs and metadata from multiple replicas (#219)
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
  • Loading branch information
berndverst committed Aug 9, 2022
1 parent ff4ea91 commit 69cb0e8
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 35 deletions.
22 changes: 16 additions & 6 deletions cmd/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"bufio"
"encoding/json"
"fmt"
"github.com/dapr/dashboard/pkg/version"
"io"
"io/ioutil"
"log"
"net/http"
Expand All @@ -26,6 +26,8 @@ import (
"strings"
"time"

"github.com/dapr/dashboard/pkg/version"

components "github.com/dapr/dashboard/pkg/components"
configurations "github.com/dapr/dashboard/pkg/configurations"
instances "github.com/dapr/dashboard/pkg/instances"
Expand Down Expand Up @@ -69,9 +71,11 @@ type DaprVersion struct {
RuntimeVersion string `json:"runtimeVersion"`
}

var inst instances.Instances
var comps components.Components
var configs configurations.Configurations
var (
inst instances.Instances
comps components.Components
configs configurations.Configurations
)

// RunWebServer starts the web server that serves the Dapr UI dashboard and the API
func RunWebServer(port int) {
Expand Down Expand Up @@ -246,12 +250,18 @@ func getLogStreamsHandler(w http.ResponseWriter, r *http.Request) {
return
}
defer c.Close()
reader, err := inst.GetLogStream(scope, id, container)
streams, err := inst.GetLogStream(scope, id, container)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer reader.Close()

var readStreams []io.Reader
for _, stream := range streams {
defer stream.Close()
readStreams = append(readStreams, stream)
}
reader := io.MultiReader(readStreams...)

lineReader := bufio.NewReader(reader)
logReader := dashboard_log.NewReader(container, lineReader)
Expand Down
98 changes: 69 additions & 29 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Instances interface {
GetInstance(scope string, id string) Instance
DeleteInstance(scope string, id string) error
GetContainers(scope string, id string) []string
GetLogStream(scope, id, containerName string) (io.ReadCloser, error)
GetLogStream(scope, id, containerName string) ([]io.ReadCloser, error)
GetDeploymentConfiguration(scope string, id string) string
GetControlPlaneStatus() []StatusOutput
GetMetadata(scope string, id string) MetadataOutput
Expand Down Expand Up @@ -141,7 +141,7 @@ func (i *instances) GetContainers(scope string, id string) []string {
}

// GetLogStream returns a stream of bytes from K8s logs
func (i *instances) GetLogStream(scope, id, containerName string) (io.ReadCloser, error) {
func (i *instances) GetLogStream(scope, id, containerName string) ([]io.ReadCloser, error) {
ctx := context.Background()
if i.kubeClient != nil {
resp, err := i.kubeClient.AppsV1().Deployments(scope).List(ctx, (meta_v1.ListOptions{}))
Expand All @@ -160,8 +160,9 @@ func (i *instances) GetLogStream(scope, id, containerName string) (io.ReadCloser
return nil, err
}

if len(pods.Items) > 0 {
p := pods.Items[0]
var logstreams []io.ReadCloser

for _, p := range pods.Items {
name := p.ObjectMeta.Name

for _, container := range p.Spec.Containers {
Expand All @@ -171,13 +172,21 @@ func (i *instances) GetLogStream(scope, id, containerName string) (io.ReadCloser
options.Container = container.Name
options.Timestamps = true
options.TailLines = &tailLines
options.Follow = true
if len(pods.Items) == 1 {
options.Follow = true
} else {
options.Follow = false // this is necessary to show logs from multiple replicas
}

res := i.kubeClient.CoreV1().Pods(p.ObjectMeta.Namespace).GetLogs(name, &options)
return res.Stream(ctx)
stream, streamErr := res.Stream(ctx)
if streamErr == nil {
logstreams = append(logstreams, stream)
}
}
}
}
return logstreams, nil
}
}
}
Expand Down Expand Up @@ -333,11 +342,11 @@ func (i *instances) GetControlPlaneStatus() []StatusOutput {
return []StatusOutput{}
}

// GetMetadata returns the result from the /v1.0/metadata endpoint
// GetMetadata returns the result from the /v1.0/metadata endpoint from all replicas
func (i *instances) GetMetadata(scope string, id string) MetadataOutput {
ctx := context.Background()
url := ""
secondaryUrl := ""
var url []string
var secondaryUrl []string
if i.kubeClient != nil {
resp, err := i.kubeClient.AppsV1().Deployments(scope).List(ctx, (meta_v1.ListOptions{}))
if err != nil || len(resp.Items) == 0 {
Expand All @@ -358,43 +367,74 @@ func (i *instances) GetMetadata(scope string, id string) MetadataOutput {

if len(pods.Items) > 0 {
p := pods.Items[0]
url = fmt.Sprintf("http://%v:%v/v1.0/metadata", p.Status.PodIP, 3501)
secondaryUrl = fmt.Sprintf("http://%v:%v/v1.0/metadata", p.Status.PodIP, 3500)
url = append(url, fmt.Sprintf("http://%v:%v/v1.0/metadata", p.Status.PodIP, 3501))
secondaryUrl = append(secondaryUrl, fmt.Sprintf("http://%v:%v/v1.0/metadata", p.Status.PodIP, 3500))
}
}
}
}

} else {
port := i.GetInstance(scope, id).HTTPPort
url = fmt.Sprintf("http://localhost:%v/v1.0/metadata", port)
url = append(url, fmt.Sprintf("http://localhost:%v/v1.0/metadata", port))
}
if url != "" {
resp, err := http.Get(url)
if err != nil && secondaryUrl != "" {
log.Println(err)
resp, err = http.Get(secondaryUrl)
if err != nil {
log.Println(err)
return MetadataOutput{}
if len(url) != 0 {
data := getMetadataOutputFromURLs(url[0], secondaryUrl[0])

if len(url) > 1 {
// merge the actor metadata from the other replicas

for i := range url[1:] {
replicaData := getMetadataOutputFromURLs(url[i+1], secondaryUrl[i+1])

for _, actor := range replicaData.Actors {
// check if this actor type is already in the list
found := false

for _, knownActor := range data.Actors {
if knownActor.Type == actor.Type {
found = true
knownActor.Count += actor.Count
break
}
}

if !found {
data.Actors = append(data.Actors, actor)
}
}
}
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
return data
}
return MetadataOutput{}
}

func getMetadataOutputFromURLs(primaryURL string, secondaryURL string) MetadataOutput {
resp, err := http.Get(primaryURL)
if err != nil && len(secondaryURL) != 0 {
log.Println(err)
resp, err = http.Get(secondaryURL)
if err != nil {
log.Println(err)
return MetadataOutput{}
}
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println(err)
return MetadataOutput{}
}

var data MetadataOutput
if err := json.Unmarshal(body, &data); err != nil {
log.Println(err)
return MetadataOutput{}
}
return data
var data MetadataOutput
if err := json.Unmarshal(body, &data); err != nil {
log.Println(err)
return MetadataOutput{}
}
return MetadataOutput{}

return data
}

// GetInstances returns the result of the appropriate environment's GetInstance function
Expand Down

0 comments on commit 69cb0e8

Please sign in to comment.