Skip to content

Commit

Permalink
fix influx db creation; minimize counter lock in influx connector
Browse files Browse the repository at this point in the history
  • Loading branch information
cha87de committed Feb 4, 2019
1 parent 2df3d77 commit 0d86d32
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions connectors/influx/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,11 @@ func (connector *Connector) push() {
}

for _, counter := range generalCounters {
counter.Access.Lock()
connector.pushCounter(counter)
counter.Access.Unlock()
}
for _, counter := range customerCounters {
for cid := range counter.CustomerIndex {
counter.Access.Lock()
connector.pushCounterCustomer(counter, cid)
counter.Access.Unlock()
}
}
}
Expand All @@ -104,10 +100,12 @@ func (connector *Connector) pushCounter(counter counters.Counter) {
}

// get measurements as points
counter.Access.Lock()
for hash := range counter.Fields {
pts := transformCounter(counter, []uint32{hash})
bp.AddPoints(pts)
}
counter.Access.Unlock()

// Write the batch
err = connector.influxClient.Write(bp)
Expand All @@ -131,9 +129,11 @@ func (connector *Connector) pushCounterCustomer(counter counters.Counter, cid st
}

// get measurements as points
counter.Access.Lock()
hashes := counter.CustomerIndex[cid]
pts := transformCounter(counter, hashes)
bp.AddPoints(pts)
counter.Access.Unlock()

// Write the batch
err = connector.influxClient.Write(bp)
Expand All @@ -143,11 +143,15 @@ func (connector *Connector) pushCounterCustomer(counter counters.Counter, cid st
}

func (connector *Connector) createDb(dbName string) {
query := client.NewQuery("CREATE DATABASE "+dbName+" WITH DURATION 3d", "", "")
query := client.NewQuery("CREATE DATABASE \""+dbName+"\" WITH DURATION 3d", "", "")
response, err := connector.influxClient.Query(query)
if err != nil && response.Error() != nil {
if err != nil {
fmt.Printf("Error creating database %s in influx: %v\n", dbName, err.Error())
}
if response.Error() != nil {
fmt.Printf("Error creating database %s in influx: %v\n", dbName, response.Error())
}

}

func transformCounter(counter counters.Counter, hashes []uint32) []*client.Point {
Expand Down

0 comments on commit 0d86d32

Please sign in to comment.