Skip to content

Commit 98dc7e8

Browse files
committed
mongo connection
1 parent b27f8fa commit 98dc7e8

File tree

373 files changed

+75385
-30
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

373 files changed

+75385
-30
lines changed

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ after_success:
1010
- bash <(curl -s https://codecov.io/bash)
1111
services:
1212
- mysql
13+
- mongodb
1314

1415
env:
15-
- TEST_MYSQL_CONNECTION="root:@(127.0.0.1:3306)/%dbname%?tx_isolation=SERIALIZABLE&parseTime=true"
16+
- TEST_MYSQL_CONNECTION="root:@(127.0.0.1:3306)/%dbname%?tx_isolation=SERIALIZABLE&parseTime=true";TEST_MONGO_CONNECTION="mongodb://localhost:27017"

README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,18 @@ Configurations are stored in json files. Example:
6060
| logger.level | `debug`, `info`, `warning`, `error` | Logging level. |
6161

6262
### DB - Configuration of storage.
63-
| Option | Possible values | Description |
64-
| ----------------- | --------------- | ------------------------------------------ |
65-
| db.implementation | `mysql`, `local` | Select your favorite db, or local storage. |
66-
| db.connection | see next table | DSN for your db. |
63+
| Option | Possible values | Description |
64+
| ----------------- | ------------------------ | ------------------------------------------ |
65+
| db.implementation | `mysql`, `local`, `mongo`| Select your favorite db, or local storage. |
66+
| db.connection | see next table | DSN for your db. |
6767

6868
>Note: Note: Local (in-memory) storage does not support common session storage between grid-instances.
6969
7070
| DB implementation | DSN format |
7171
| ----------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
7272
| mysql | [spec](https://github.com/go-sql-driver/mysql#dsn-data-source-name), example `db_user:db_pass@(db_host:3306)/db_name?parseTime=true` (parseTime=true - required option) |
7373
| local | omit this property, because every instance have its own in-memory storage |
74+
| mongo | NOTE! Mongo db temporary supports only persistent node strategy, example `mongodb://localhost:27017`
7475

7576
### Statsd - Configuration of metrics(optional).
7677
| Option | Possible values | Description |

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type Logger struct {
5353
type DB struct {
5454
Implementation string `json:"implementation"`
5555
Connection string `json:"connection"`
56+
DbName string `json:"db_name"`
5657
}
5758

5859
// Statsd - Settings of metrics sender.

handlers/createSession.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (h *CreateSession) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
7474
func (h *CreateSession) tryCreateSession(r *http.Request, capabilities *capabilities.Capabilities) (*proxy.ResponseWriter, error) {
7575
select {
7676
case <-r.Context().Done():
77-
err := errors.New("Request cancelled by client, " + r.Context().Err().Error())
77+
err := errors.New("Request canceled by client, " + r.Context().Err().Error())
7878
return nil, err
7979
default:
8080
}

invoke.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package main
33
import (
44
_ "github.com/go-sql-driver/mysql"
55

6+
"github.com/qa-dev/jsonwire-grid/storage/mongo"
7+
68
"github.com/qa-dev/jsonwire-grid/config"
79

810
"errors"
@@ -32,6 +34,13 @@ func invokeStorageFactory(config config.Config) (factory StorageFactoryInterface
3234
factory = new(mysql.Factory)
3335
case "local":
3436
factory = new(local.Factory)
37+
case "mongo":
38+
for _, s := range config.Grid.StrategyList {
39+
if s.Type != string(pool.NodeTypePersistent) {
40+
err = errors.New("Invalid config: Mongo db supports only persistent node strategy ")
41+
}
42+
}
43+
factory = new(mongo.Factory)
3544
default:
3645
err = errors.New("Invalid config, unknown param [db.implementation=" + config.DB.Implementation + "]")
3746
}

pool/node.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ type Node struct {
2222
// The value may depend on the strategy:
2323
// - for constant nodes ip: port
2424
// - for temporary pod.name
25-
Key string `json:"key"`
26-
Type NodeType `json:"type"`
27-
Address string `json:"address"`
28-
Status NodeStatus `json:"status"`
29-
SessionID string `json:"session_id"`
30-
Updated int64 `json:"updated"`
31-
Registered int64 `json:"registered"`
32-
CapabilitiesList []capabilities.Capabilities `json:"capabilities_list"`
25+
Key string `json:"key" bson:"key"`
26+
Type NodeType `json:"type" bson:"type"`
27+
Address string `json:"address" bson:"address"`
28+
Status NodeStatus `json:"status" bson:"status"`
29+
SessionID string `json:"session_id" bson:"session_id"`
30+
Updated int64 `json:"updated" bson:"updated"`
31+
Registered int64 `json:"registered" bson:"registered"`
32+
CapabilitiesList []capabilities.Capabilities `json:"capabilities_list" bson:"capabilities_list"`
3333
}
3434

3535
func (n *Node) String() string {

pool/strategy/kubernetes/strategy.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ func (s *Strategy) Reserve(desiredCaps capabilities.Capabilities) (pool.Node, er
4848
}
4949
node.Address = nodeAddress
5050
return *node, nil
51-
5251
}
5352

5453
func (s *Strategy) CleanUp(node pool.Node) error {

pool/strategy/persistent/strategy.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ func (s *Strategy) registerCapabilities(nodeList []pool.Node) {
112112
for _, node := range nodeList {
113113
for _, availableCaps := range node.CapabilitiesList {
114114
s.capsComparator.Register(availableCaps)
115-
116115
}
117116
}
118117
}

storage/mongo/factory.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package mongo
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"go.mongodb.org/mongo-driver/bson"
8+
"go.mongodb.org/mongo-driver/mongo"
9+
"go.mongodb.org/mongo-driver/mongo/options"
10+
11+
"github.com/qa-dev/jsonwire-grid/config"
12+
"github.com/qa-dev/jsonwire-grid/pool"
13+
)
14+
15+
type Factory struct {
16+
}
17+
18+
func (f *Factory) Create(cfg config.Config) (pool.StorageInterface, error) {
19+
ctx := context.Background()
20+
client, err := mongo.Connect(ctx, options.Client().ApplyURI(cfg.DB.Connection))
21+
if err != nil {
22+
panic("Database connection error: " + err.Error())
23+
}
24+
25+
err = client.Ping(ctx, nil)
26+
if err != nil {
27+
err = errors.New("Database connection not established: " + err.Error())
28+
return nil, err
29+
}
30+
31+
db := client.Database(cfg.DB.DbName)
32+
s := NewMongoStorage(db)
33+
mod := mongo.IndexModel{
34+
Keys: bson.M{
35+
"key": 1,
36+
"address": 1,
37+
},
38+
Options: options.Index().SetUnique(true).SetName("key_address_unique"),
39+
}
40+
41+
_, err = s.collection.Indexes().CreateOne(ctx, mod)
42+
if err != nil {
43+
err = errors.New("Create index error: " + err.Error())
44+
return nil, err
45+
}
46+
return s, nil
47+
}

storage/mongo/mongo.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package mongo
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
"go.mongodb.org/mongo-driver/mongo/options"
11+
12+
"github.com/qa-dev/jsonwire-grid/pool"
13+
"github.com/qa-dev/jsonwire-grid/storage"
14+
)
15+
16+
type Storage struct {
17+
collection *mongo.Collection
18+
ctx context.Context
19+
}
20+
21+
func NewMongoStorage(db *mongo.Database) *Storage {
22+
return &Storage{
23+
collection: db.Collection("grid"),
24+
ctx: context.Background(),
25+
}
26+
}
27+
28+
func (m *Storage) Add(node pool.Node, limit int) error {
29+
opts := options.Update().SetUpsert(true)
30+
filter := bson.M{"key": node.Key}
31+
update := bson.D{{Key: "$set", Value: node}}
32+
33+
if limit > 0 {
34+
return errors.New("[Mongo/Add] limit strategy temporary not supported ")
35+
}
36+
37+
result, err := m.collection.UpdateOne(m.ctx, filter, update, opts)
38+
if err != nil {
39+
return errors.New("[Mongo/Add] update node, " + err.Error())
40+
}
41+
if result.MatchedCount == 0 && result.UpsertedCount == 0 {
42+
return errors.New("[Mongo/Add] No rows was affected ")
43+
}
44+
45+
return nil
46+
}
47+
48+
func (m *Storage) ReserveAvailable(nodeList []pool.Node) (pool.Node, error) {
49+
nodeKeyList := make([]string, 0, len(nodeList))
50+
node := pool.Node{}
51+
for _, node := range nodeList {
52+
nodeKeyList = append(nodeKeyList, node.Key)
53+
}
54+
55+
filter := bson.M{"key": bson.M{"$in": nodeKeyList}, "status": pool.NodeStatusAvailable}
56+
57+
update := bson.M{"$set": bson.M{
58+
"updated": time.Now().Unix(),
59+
"status": string(pool.NodeStatusReserved),
60+
}}
61+
62+
opts := options.
63+
FindOneAndUpdate().
64+
SetReturnDocument(options.After).
65+
SetSort(bson.M{"updated": 1})
66+
67+
err := m.collection.FindOneAndUpdate(m.ctx, filter, update, opts).Decode(&node)
68+
if err != nil {
69+
err = errors.New("[Mongo/ReserveAvailable] find and update node, " + err.Error())
70+
return node, err
71+
}
72+
return node, nil
73+
}
74+
75+
func (m *Storage) SetBusy(node pool.Node, sessionID string) error {
76+
filter := bson.M{"key": node.Key}
77+
update := bson.M{"$set": bson.M{
78+
"session_id": sessionID,
79+
"updated": time.Now().Unix(),
80+
"status": string(pool.NodeStatusBusy),
81+
}}
82+
result, err := m.collection.UpdateOne(m.ctx, filter, update)
83+
if err != nil {
84+
err = errors.New("[Mongo/SetBusy] update node in collection, " + err.Error())
85+
return err
86+
}
87+
if result.ModifiedCount == 0 {
88+
return storage.ErrNotFound
89+
}
90+
return nil
91+
}
92+
93+
func (m *Storage) SetAvailable(node pool.Node) error {
94+
filter := bson.M{"key": node.Key}
95+
update := bson.M{"$set": bson.M{
96+
"updated": time.Now().Unix(),
97+
"status": string(pool.NodeStatusAvailable),
98+
}}
99+
result, err := m.collection.UpdateOne(m.ctx, filter, update)
100+
if err != nil {
101+
err = errors.New("[Mongo/SetAvailable] update node in collection, " + err.Error())
102+
return err
103+
}
104+
if result.ModifiedCount == 0 {
105+
return storage.ErrNotFound
106+
}
107+
return nil
108+
}
109+
110+
func (m *Storage) GetCountWithStatus(status *pool.NodeStatus) (int, error) {
111+
var count int
112+
var err error
113+
filter := bson.M{}
114+
if status != nil {
115+
filter = bson.M{"status": status}
116+
}
117+
cnt, err := m.collection.CountDocuments(m.ctx, filter)
118+
count = int(cnt)
119+
if err != nil {
120+
return 0, errors.New("[Mongo/GetCountWithStatus] count nodes in collection, " + err.Error())
121+
}
122+
return count, nil
123+
}
124+
125+
func (m *Storage) GetBySession(session string) (pool.Node, error) {
126+
node, err := m.getByField("session_id", session)
127+
if err != nil {
128+
return pool.Node{}, errors.New("[Mongo/GetBySession] find node in collection, " + err.Error())
129+
}
130+
return node, nil
131+
}
132+
133+
func (m *Storage) GetByAddress(address string) (pool.Node, error) {
134+
node, err := m.getByField("address", address)
135+
if err != nil {
136+
return pool.Node{}, errors.New("[Mongo/GetByAddress] find node in collection, " + err.Error())
137+
}
138+
return node, nil
139+
}
140+
141+
func (m *Storage) GetAll() ([]pool.Node, error) {
142+
nodeList := make([]pool.Node, 0)
143+
resultCursor, err := m.collection.Find(m.ctx, bson.M{})
144+
if err != nil {
145+
return nil, errors.New("[Mongo/GetAll] find node in collection, " + err.Error())
146+
}
147+
defer resultCursor.Close(m.ctx)
148+
for resultCursor.Next(m.ctx) {
149+
var result pool.Node
150+
err := resultCursor.Decode(&result)
151+
if err != nil {
152+
return nil, errors.New("[Mongo/GetAll] decode node data" + err.Error())
153+
}
154+
nodeList = append(nodeList, result)
155+
}
156+
if err := resultCursor.Err(); err != nil {
157+
return nil, errors.New("[Mongo/GetAll] iterate result" + err.Error())
158+
}
159+
return nodeList, nil
160+
}
161+
162+
func (m *Storage) Remove(node pool.Node) error {
163+
rowsAffected, err := m.collection.DeleteOne(m.ctx, bson.M{"key": node.Key})
164+
if err != nil {
165+
return errors.New("[Mongo/Remove] delete from node collection, " + err.Error())
166+
}
167+
if rowsAffected.DeletedCount == 0 {
168+
return errors.New("[Mongo/Remove] delete from node collection: affected 0 rows")
169+
}
170+
return nil
171+
}
172+
173+
func (m *Storage) UpdateAddress(node pool.Node, newAddress string) error {
174+
filter := bson.M{"key": node.Key}
175+
update := bson.M{"$set": bson.M{"address": newAddress}}
176+
result, err := m.collection.UpdateOne(m.ctx, filter, update)
177+
if err != nil {
178+
err = errors.New("[Mongo/UpdateAddress], " + err.Error())
179+
return err
180+
}
181+
if result.ModifiedCount == 0 {
182+
return storage.ErrNotFound
183+
}
184+
return nil
185+
}
186+
187+
func (m *Storage) getByField(key, value string) (pool.Node, error) {
188+
node := pool.Node{}
189+
filter := bson.M{
190+
key: bson.M{"$eq": value},
191+
}
192+
err := m.collection.FindOne(m.ctx, filter).Decode(&node)
193+
return node, err
194+
}

0 commit comments

Comments
 (0)