-
Notifications
You must be signed in to change notification settings - Fork 29
/
client_data_table.go
139 lines (123 loc) · 4.71 KB
/
client_data_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package syncapi
import (
"context"
"database/sql"
"github.com/finogeeks/ligase/common"
"github.com/finogeeks/ligase/model/dbtypes"
"github.com/finogeeks/ligase/model/types"
log "github.com/finogeeks/ligase/skunkworks/log"
)
const clientDataStreamSchema = `
-- This sequence is shared between all the tables generated from kafka logs.
-- CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
-- Stores the types of account data that a user set has globally and in each room
-- and the stream ID when that type was last updated.
CREATE TABLE IF NOT EXISTS syncapi_client_data_stream (
-- An incrementing ID which denotes the position in the log that this event resides at.
-- id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
id BIGINT PRIMARY KEY,
-- ID of the user the data belongs to
user_id TEXT NOT NULL,
room_id TEXT,
-- Type of the data
data_type TEXT,
stream_type TEXT NOT NULL,
CONSTRAINT syncapi_client_data_stream_unique UNIQUE (user_id, room_id, data_type, stream_type)
);
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_client_data_stream_id_idx ON syncapi_client_data_stream(id);
CREATE INDEX IF NOT EXISTS syncapi_client_data_user_id_idx ON syncapi_client_data_stream(user_id);
`
const insertClientDataStreamSQL = "" +
"INSERT INTO syncapi_client_data_stream (id, user_id, room_id, data_type, stream_type) VALUES ($1, $2, $3, $4, $5)" +
" ON CONFLICT ON CONSTRAINT syncapi_client_data_stream_unique" +
" DO UPDATE SET id = EXCLUDED.id" +
" RETURNING id"
const selectHistoryClientDataStreamSQL = "" +
"SELECT id, user_id, room_id, data_type, stream_type FROM syncapi_client_data_stream" +
" WHERE user_id = $1 AND stream_type != ''" +
" ORDER BY id DESC LIMIT $2"
type clientDataStreamStatements struct {
db *Database
insertClientDataStreamStmt *sql.Stmt
selectHistoryClientDataStreamStmt *sql.Stmt
}
func (s *clientDataStreamStatements) getSchema() string {
return clientDataStreamSchema
}
func (s *clientDataStreamStatements) prepare(db *sql.DB, d *Database) (err error) {
s.db = d
if s.insertClientDataStreamStmt, err = db.Prepare(insertClientDataStreamSQL); err != nil {
return
}
if s.selectHistoryClientDataStreamStmt, err = db.Prepare(selectHistoryClientDataStreamSQL); err != nil {
return
}
return
}
func (s *clientDataStreamStatements) insertClientDataStream(
ctx context.Context, id int64,
userID, roomID, dataType, streamType string,
) (pos int64, err error) {
if s.db.AsyncSave == true {
var update dbtypes.DBEvent
update.Category = dbtypes.CATEGORY_SYNC_DB_EVENT
update.Key = dbtypes.SyncClientDataInsertKey
update.SyncDBEvents.SyncClientDataInsert = &dbtypes.SyncClientDataInsert{
ID: id,
UserID: userID,
RoomID: roomID,
DataType: dataType,
StreamType: streamType,
}
update.SetUid(int64(common.CalcStringHashCode64(userID)))
s.db.WriteDBEventWithTbl(ctx, &update, "syncapi_client_data_stream")
return id, nil
} else {
err = s.insertClientDataStreamStmt.QueryRowContext(ctx, id, userID, roomID, dataType, streamType).Scan(&pos)
return
}
}
func (s *clientDataStreamStatements) onInsertClientDataStream(
ctx context.Context, id int64,
userID, roomID, dataType, streamType string,
) (pos int64, err error) {
err = s.insertClientDataStreamStmt.QueryRowContext(ctx, id, userID, roomID, dataType, streamType).Scan(&pos)
return
}
func (s *clientDataStreamStatements) selectHistoryStream(
ctx context.Context,
userID string,
limit int,
) (streams []types.ActDataStreamUpdate, offset []int64, err error) {
rows, err := s.selectHistoryClientDataStreamStmt.QueryContext(ctx, userID, limit)
if err != nil {
log.Errorf("clientDataStreamStatements.selectHistoryStream err: %v", err)
return
}
streams = []types.ActDataStreamUpdate{}
offset = []int64{}
defer rows.Close()
for rows.Next() {
var stream types.ActDataStreamUpdate
var streamPos int64
if err := rows.Scan(&streamPos, &stream.UserID, &stream.RoomID, &stream.DataType, &stream.StreamType); err != nil {
return nil, nil, err
}
streams = append(streams, stream)
offset = append(offset, streamPos)
}
return
}