Skip to content

Commit

Permalink
Parallel version
Browse files Browse the repository at this point in the history
Added parallelization for cache updates
  • Loading branch information
leucos committed Dec 3, 2020
1 parent 98cb15f commit 6114ffe
Showing 1 changed file with 56 additions and 18 deletions.
74 changes: 56 additions & 18 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type App struct {
logger zerolog.Logger
}

type jobResult struct {
distribution string
versions []string
}

var (
// ErrAlreadyInstalled is returned when the requested version is already installed
ErrAlreadyInstalled = errors.New("version already installed")
Expand Down Expand Up @@ -544,44 +549,77 @@ func (a *App) updateGithub() error {
return nil
}

func (a *App) updateLocally(which ...string) error {
bar := progressbar.Default(int64(len(which)), "updating distributions")

for _, d := range which {
bar.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
func (a *App) fetcher(id int, jobs <-chan string, res chan<- jobResult, timeout time.Duration) {
for d := range jobs {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

subctx := a.logger.WithContext(ctx)
var err error

a.logger.Debug().Msgf("feching available versions for %q", d)
if _, ok := a.listers[d]; !ok {
a.logger.Error().Msgf("no distribution named %q", d)
continue
r := jobResult{
distribution: d,
}
versions, err := a.listers[d].Get(subctx)

subctx := a.logger.WithContext(ctx)
r.versions, err = a.listers[d].Get(subctx)
if errors.Is(err, list.ErrGithubRateLimitClose) || errors.Is(err, list.ErrGithubRateLimited) {
a.logger.Error().Err(err).Msgf("unable to fetch versions for %q", d)
return err
// return err

continue
}
if err != nil {
a.logger.Error().Err(err).Msgf("unable to fetch versions for %q", d)
// continue
}

a.logger.Debug().Msgf("found versions %q", strings.Join(r.versions, ","))

res <- r
}
}

func (a *App) updateLocally(which ...string) error {
bar := progressbar.Default(int64(len(which)), "updating distributions")

jobs := make(chan string)
res := make(chan jobResult, 1000)
timeout := 1 * time.Second

for w := 1; w <= 8; w++ {
go a.fetcher(w, jobs, res, timeout)
}

count := 0
for _, d := range which {
bar.Add(1)

a.logger.Debug().Msgf("feching available versions for %q", d)
if _, ok := a.listers[d]; !ok {
a.logger.Error().Msgf("no distribution named %q", d)
continue
}

a.logger.Debug().Msgf("found versions %q", strings.Join(versions, ","))
jobs <- d
count++
}

close(jobs)

for c := 0; c < count; c++ {
r := <-res

// Flush cache entry
a.cache[d] = []string{}
a.cache[r.distribution] = []string{}

// Convert versions to canonical form
for _, v := range versions {
for _, v := range r.versions {
version, err := gov.NewVersion(v)
if err != nil {
a.logger.Warn().Err(err).Msgf("ignoring invalid version for %q", d)
a.logger.Debug().Err(err).Msgf("ignoring invalid version for %q", r.distribution)
continue
}
a.cache[d] = append(a.cache[d], version.String())
a.cache[r.distribution] = append(a.cache[r.distribution], version.String())
}
}
return nil
Expand Down

0 comments on commit 6114ffe

Please sign in to comment.