Skip to content

Commit

Permalink
feat: improve warnings, fix counter race
Browse files Browse the repository at this point in the history
  • Loading branch information
wyattjoh committed Mar 30, 2022
1 parent f35c443 commit e40115b
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 30 deletions.
10 changes: 0 additions & 10 deletions README.md
Expand Up @@ -221,13 +221,3 @@ go build -o <location>
`--disableMonotonicCursorTimes` flag. This means that every timestamp emitted
that shares the same second time will automatically have it's ms time
incremented to prevent collisions.

<!-- STARTED 6.3GB RAM -->

```bash
cat __sensitive__/logs.json | jq 'select(.msg == "memory stats" and .run == 1648582446).alloc' | sort -n | tail -n1
cat __sensitive__/logs.json | jq 'select(.msg == "memory stats" and .run == 1648584673).alloc' | sort -n | tail -n1

cat __sensitive__/logs.json | jq 'select(.msg == "memory stats" and .run == 1648582446).sys' | sort -n | tail -n1
cat __sensitive__/logs.json | jq 'select(.msg == "memory stats" and .run == 1648584673).sys' | sort -n | tail -n1
```
16 changes: 6 additions & 10 deletions internal/utility/counter/counter.go
Expand Up @@ -9,26 +9,22 @@ func New(title string, total int) *Counter {
color.New(color.Bold).Printf("\n%s\n", title)

return &Counter{
c: pb.Full.Start(total),
total: total,
bar: pb.Full.Start(total),
}
}

type Counter struct {
c *pb.ProgressBar
current int
total int
bar *pb.ProgressBar
}

func (c *Counter) Increment() {
c.current++
c.c.Increment()
c.bar.Increment()
}

func (c *Counter) Finish() {
if c.current < c.total {
c.c.AddTotal(int64(c.current - c.total))
if c.bar.Current() < c.bar.Total() {
c.bar.AddTotal(c.bar.Current() - c.bar.Total())
}

c.c.Finish()
c.bar.Finish()
}
2 changes: 1 addition & 1 deletion internal/warnings/actions.go
Expand Up @@ -3,5 +3,5 @@ package warnings
var (
// When an action is found in import data that does correspond to an action on
// a comment, this warning is emitted.
NonCommentAction = NewWarning()
NonCommentAction = NewWarning("NonCommentAction", "an action was encountered that was not linked to a comment and was therefore skipped")
)
19 changes: 19 additions & 0 deletions internal/warnings/registry.go
@@ -0,0 +1,19 @@
package warnings

var all []*Warning

func Every(fn func(warning *Warning)) {
for _, warning := range all {
fn(warning)
}
}

func register(warning *Warning) {
all = append(all, warning)
}

func init() {
register(NonCommentAction)
register(UnsupportedDateFormat)
register(UnsupportedUserProfileProvider)
}
2 changes: 1 addition & 1 deletion internal/warnings/time.go
Expand Up @@ -2,5 +2,5 @@ package warnings

var (
// When an unsupported date format is encountered, this warning is emitted.
UnsupportedDateFormat = NewWarning()
UnsupportedDateFormat = NewWarning("UnsupportedDateFormat", "a date format was encountered and was not processed")
)
2 changes: 1 addition & 1 deletion internal/warnings/users.go
Expand Up @@ -3,5 +3,5 @@ package warnings
var (
// When a user profile is found that is not supported, this warning is
// emitted.
UnsupportedUserProfileProvider = NewWarning()
UnsupportedUserProfileProvider = NewWarning("UnsupportedUserProfileProvider", "a user profile provider was encountered that was not supported, and was therefore skipped")
)
49 changes: 43 additions & 6 deletions internal/warnings/warnings.go
@@ -1,22 +1,35 @@
package warnings

import "sync"
import (
"fmt"
"sync"
"sync/atomic"
)

func NewWarning() *Warning {
func NewWarning(name, description string) *Warning {
return &Warning{
once: &sync.Once{},
m: &sync.Map{},
name: name,
description: description,
once: &sync.Once{},
m: &sync.Map{},
}
}

type Warning struct {
once *sync.Once
m *sync.Map
name string
description string

once *sync.Once
m *sync.Map
occurrences int32
}

// OnceWith will only allow the function to execute if the key has not been
// seen yet.
func (w *Warning) OnceWith(fn func(), key string) {
// Add to the number of occurrences.
atomic.AddInt32(&w.occurrences, 1)

// Load the empty struct or store a new one. If it was loaded (and not
// stored) then we have already seen this key and it should be skipped.
if _, ok := w.m.LoadOrStore(key, struct{}{}); ok {
Expand All @@ -28,5 +41,29 @@ func (w *Warning) OnceWith(fn func(), key string) {

// Once will only allow the function to execute if it has not before.
func (w *Warning) Once(fn func()) {
// Add to the number of occurrences.
atomic.AddInt32(&w.occurrences, 1)

w.once.Do(fn)
}

func (w *Warning) String() string {
return fmt.Sprintf("%s: %s", w.name, w.description)
}

func (w *Warning) Occurrences() int32 {
return atomic.LoadInt32(&w.occurrences)
}

func (w *Warning) Keys() []string {
keys := make([]string, 0)
w.m.Range(func(key, value interface{}) bool {
if str, ok := key.(string); ok {
keys = append(keys, str)
}

return true
})

return keys
}
14 changes: 14 additions & 0 deletions main.go
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/coralproject/coral-importer/common/coral"
"github.com/coralproject/coral-importer/internal/warnings"
"github.com/coralproject/coral-importer/strategies/csv"
"github.com/coralproject/coral-importer/strategies/legacy"
"github.com/coralproject/coral-importer/strategies/livefyre"
Expand Down Expand Up @@ -234,5 +235,18 @@ func main() {
logrus.WithError(err).Fatal()
}

warnings.Every(func(warning *warnings.Warning) {
occurrences := warning.Occurrences()
if occurrences == 0 {
return
}

logrus.WithFields(logrus.Fields{
"warning": warning.String(),
"occurrences": occurrences,
"keys": warning.Keys(),
}).Warn("warning occurred")
})

color.New(color.Bold, color.FgGreen).Printf("\nImport Completed, took %s\n", time.Since(start))
}
4 changes: 4 additions & 0 deletions strategies/csv/csv.go
Expand Up @@ -185,6 +185,10 @@ func Import(c strategies.Context) error {
// Reconstruct all the family relationships from the parentID map.
reconstructor := common.NewReconstructor()
for commentID, comment := range comments {
if comment.ParentID == "" {
continue
}

reconstructor.AddIDs(commentID, comment.ParentID)
}

Expand Down
7 changes: 6 additions & 1 deletion strategies/legacy/legacy.go
Expand Up @@ -148,8 +148,13 @@ func ReconstructFamilies(ctx *Context) {

// Reconstruct all the family relationships from the parentID map.
for commentID, comment := range ctx.comments {
ctx.Reconstructor.AddIDs(commentID, comment.ParentID)
bar.Increment()

if comment.ParentID == "" {
continue
}

ctx.Reconstructor.AddIDs(commentID, comment.ParentID)
}
}

Expand Down

0 comments on commit e40115b

Please sign in to comment.