-
Notifications
You must be signed in to change notification settings - Fork 1
/
m3pointdb.go
117 lines (111 loc) · 3.34 KB
/
m3pointdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package m3path
import (
"github.com/freddy33/qsm-go/m3db"
"github.com/freddy33/qsm-go/m3point"
"strings"
"sync"
"time"
)
func getPointEnv(env *m3db.QsmEnvironment, pointId int64) (*m3point.Point, error) {
te, err := env.GetOrCreateTableExec(PointsTable)
if err != nil {
return nil, m3db.MakeQsmErrorf("could not get points table exec due to '%s'", err.Error())
}
rows, err := te.Query(SelectPointPerId, pointId)
if err != nil {
return nil, m3db.MakeQsmErrorf("could not select point %d from points table exec due to '%s'", pointId, err.Error())
}
defer te.CloseRows(rows)
if rows.Next() {
res := m3point.Point{}
err = rows.Scan(&res[0], &res[1], &res[2])
if err != nil {
return nil, m3db.MakeQsmErrorf("could not read row of %s for %d due to '%s'", PointsTable, pointId, err.Error())
} else {
return &res, nil
}
}
return nil, m3db.MakeQsmErrorf("point id %d does not exists!", pointId)
}
func getOrCreatePointEnv(env *m3db.QsmEnvironment, p m3point.Point) int64 {
te, err := env.GetOrCreateTableExec(PointsTable)
if err != nil {
Log.Errorf("could not get points table exec due to %v", err)
return -1
}
return getOrCreatePointTe(te, p)
}
func getOrCreatePointTe(te *m3db.TableExec, p m3point.Point) int64 {
rows, err := te.Query(FindPointIdPerCoord, p.X(), p.Y(), p.Z())
if err != nil {
Log.Fatalf("could not select points table exec due to %v", err)
return -1
}
defer te.CloseRows(rows)
var id int64
if rows.Next() {
err = rows.Scan(&id)
if err != nil {
Log.Fatalf("could not convert points table id for %v due to %v", p, err)
return -1
}
return id
} else {
id, err = te.InsertReturnId(p.X(), p.Y(), p.Z())
if err == nil {
return id
} else {
errorMessage := err.Error()
if strings.Contains(errorMessage, "duplicate key") && strings.Contains(errorMessage, "points_x_y_z_key") {
// got concurrent insert, let's just reselect
rows, err = te.Query(FindPointIdPerCoord, p.X(), p.Y(), p.Z())
if err != nil {
Log.Fatalf("could not select points table for %v after duplicate key insert exec due to %v", p, err)
return -1
}
defer te.CloseRows(rows)
if !rows.Next() {
Log.Errorf("selecting points table for %v after duplicate key returns no rows!", p)
return -1
}
err = rows.Scan(&id)
if err != nil {
Log.Errorf("could not convert points table id for %v due to %v", p, err)
return -1
}
return id
} else {
Log.Fatalf("got unknown points table for %v error %v", p, err)
return -1
}
}
}
}
/***************************************************************/
// perf test main
/***************************************************************/
func RunInsertRandomPoints() {
m3db.SetToTestMode()
env := GetFullTestDb(m3db.PerfTestEnv)
// increase concurrency chance with low random
rdMax := m3point.CInt(10)
nbRoutines := 100
nbRound := 250
start := time.Now()
wg := new(sync.WaitGroup)
for r := 0; r < nbRoutines; r++ {
wg.Add(1)
go func() {
for i := 0; i < nbRound; i++ {
randomPoint := m3point.CreateRandomPoint(rdMax)
id := getOrCreatePointEnv(env, randomPoint)
if id <= 0 {
Log.Errorf("failed to insert %v got %d id", randomPoint, id)
}
}
wg.Done()
}()
}
wg.Wait()
Log.Infof("It took %v to create %d points with nb routines=%d max coord %d", time.Now().Sub(start), nbRoutines*nbRound, nbRoutines, rdMax)
}