/
dynamodb.go
121 lines (100 loc) · 2.75 KB
/
dynamodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// SPDX-FileCopyrightText: 2021 Softbear, Inc.
// SPDX-License-Identifier: AGPL-3.0-or-later
package db
import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/guregu/dynamo"
)
type DynamoDBDatabase struct {
svc *dynamodb.DynamoDB
db *dynamo.DB
scoresTable dynamo.Table
serversTable dynamo.Table
statisticsTable dynamo.Table
}
func NewDynamoDBDatabase(session *session.Session, stage string) (*DynamoDBDatabase, error) {
ddb := &DynamoDBDatabase{svc: dynamodb.New(session)}
ddb.db = dynamo.NewFromIface(ddb.svc)
ddb.scoresTable = ddb.db.Table("mk48-" + stage + "-scores")
ddb.serversTable = ddb.db.Table("mk48-" + stage + "-servers")
ddb.statisticsTable = ddb.db.Table("mk48-" + stage + "-statistics")
return ddb, nil
}
func (ddb *DynamoDBDatabase) UpdateScore(score Score) error {
err := ddb.scoresTable.Put(score).If("attribute_not_exists(score) OR score < ?", score.Score).Run()
if err != nil {
if _, ok := err.(*dynamodb.ConditionalCheckFailedException); ok {
return nil
}
}
return err
}
func (ddb *DynamoDBDatabase) ReadScores() (scores []Score, err error) {
query := ddb.scoresTable.Scan().Iter()
for {
var score Score
ok := query.Next(&score)
if !ok {
err = query.Err()
return
}
scores = append(scores, score)
}
// Unreachable
return
}
func (ddb *DynamoDBDatabase) ReadScoresByType(scoreType string) (scores []Score, err error) {
query := ddb.scoresTable.Get("type", scoreType).Iter()
for {
var score Score
ok := query.Next(&score)
if !ok {
err = query.Err()
return
}
scores = append(scores, score)
}
// Unreachable
return
}
func (ddb *DynamoDBDatabase) UpdateServer(server Server) error {
return ddb.serversTable.Put(server).Run()
}
func (ddb *DynamoDBDatabase) ReadServers() (servers []Server, err error) {
query := ddb.serversTable.Scan().Iter()
for {
var server Server
ok := query.Next(&server)
if !ok {
err = query.Err()
return
}
servers = append(servers, server)
}
// Unreachable
return
}
func (ddb *DynamoDBDatabase) ReadServersByRegion(region string) (servers []Server, err error) {
query := ddb.serversTable.Get("region", region).Iter()
for {
var server Server
ok := query.Next(&server)
if !ok {
err = query.Err()
return
}
servers = append(servers, server)
}
// Unreachable
return
}
func (ddb *DynamoDBDatabase) UpdateStatistic(statistic Statistic) error {
const hourMillis = 60 * 60 * 1000
hour := (statistic.Timestamp / hourMillis) * hourMillis
update := ddb.statisticsTable.Update("region", statistic.Region).Range("timestamp", hour)
update.Add("plays", statistic.Plays)
update.Add("players", statistic.Players)
update.Add("newPlayers", statistic.NewPlayers)
return update.Run()
}