/
database.go
229 lines (215 loc) · 6.82 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package b2schema
import (
"encoding/json"
"errors"
"log"
"time"
"github.com/rs/xid"
rdb "github.com/tecbot/gorocksdb"
"github.com/thoas/go-funk"
)
// TODO: - up broadcast to global index - grpc notify whole clusters
// B2Database babydb数据库结构体
type B2Database struct {
// 数据库名称
Database string `json:"Database"`
// 数据库表列表
TableList []string `json:"TableList,omitempty"`
// 数据库全局唯一ID,用于作为在rocksdb中的真实数据库ID
DatabaseID string `json:"DatabaseID"`
// rocksdb只读连接结构体
RocksDbReadConn *rdb.DB `json:"-"`
// rocksdb事务连接结构体
RocksDbWriteConn *rdb.TransactionDB `json:"-"`
// 创建时间
CreateTime time.Time `json:"CreateTime,omitempty"`
// 打开时间
OpenTime time.Time `json:"OpenTime,omitempty"`
}
// NewDatabase 创建一个新的数据库
// name 数据库名称
// meta 元数据库连接结构体指针
func NewDatabase(name string, meta *MetaDBSource) (*B2Database, error) {
// 加锁,保证修改元数据时是原子操作
meta.Mu.Lock()
defer meta.Mu.Unlock()
db, err := meta.GetDatabase(name)
if err == nil {
log.Fatalf("数据库名称已经存在: %s\n", name)
return nil, errors.New("database name duplicated")
}
guid := xid.New()
db = &B2Database{
Database: name,
TableList: nil,
DatabaseID: guid.String(),
RocksDbReadConn: nil,
RocksDbWriteConn: nil,
CreateTime: time.Now(),
OpenTime: time.Now(),
}
if err = meta.PutDatabase(db); err != nil {
log.Fatalf("创建数据库META时发生错误: %v\n", err)
return nil, err
}
opts := rdb.NewDefaultOptions()
opts.SetCreateIfMissing(true)
opts.SetErrorIfExists(true)
create, err := rdb.OpenDb(opts, db.DatabaseID)
if err != nil {
log.Fatalf("创建数据库文件时发生错误: %v\n", err)
return nil, err
}
create.Close()
// TODO: up broadcast meta data to global index
return db, nil
}
// NewDatabaseAndOpen 新建数据库并打开
func NewDatabaseAndOpen(name string, meta *MetaDBSource) (*B2Database, error) {
b2db, err := NewDatabase(name, meta)
if err != nil {
log.Fatalf("新建数据库时发生错误: %v\n", err)
return nil, err
}
b2db, err = b2db.OpenConnection()
if err != nil {
log.Fatalf("打开数据库时发生错误: %v\n", err)
return nil, err
}
return b2db, nil
}
// OpenConnection 打开B2DB数据库连接
func (b2db *B2Database) OpenConnection() (*B2Database, error) {
opts := rdb.NewDefaultOptions()
topts := rdb.NewDefaultTransactionDBOptions()
write, err := rdb.OpenTransactionDb(opts, topts, b2db.DatabaseID)
if err != nil {
log.Fatalf("打开数据库连接时发生错误: %v\n", err)
return nil, err
}
opts.SetCreateIfMissing(false)
opts.SetErrorIfExists(false)
read, err := rdb.OpenDbForReadOnly(opts, b2db.DatabaseID, false)
b2db.RocksDbReadConn = read
b2db.RocksDbWriteConn = write
b2db.OpenTime = time.Now()
return b2db, nil
}
// DropDatabase 删除数据库
func DropDatabase(name string, meta *MetaDBSource) error {
meta.Mu.Lock()
defer meta.Mu.Unlock()
b2db, err := meta.GetDatabase(name)
if err != nil {
log.Fatalf("找不到要删除的数据库: %s\n", name)
return err
}
b2db.Close()
opts := rdb.NewDefaultOptions()
if err = rdb.DestroyDb(b2db.DatabaseID, opts); err != nil {
log.Fatalf("删除数据库文件时发生错误: %v\n", err)
return err
}
if err = meta.DelDatabase(name); err != nil {
log.Fatalf("删除数据库META记录时发生错误,数据一致性可能已经破坏: %v\n", err)
return err
}
return nil
}
// GetTable 在元数据中获取某个数据库表META内容
func (b2db *B2Database) GetTable(tableName string, meta *MetaDBSource) (*B2Table, error) {
return meta.getTable(b2db.Database, tableName)
}
// AddTable 在数据库中添加表
func (b2db *B2Database) AddTable(table *B2Table, meta *MetaDBSource) error {
if !table.validate() {
log.Printf("数据库表 %s META内容不完整或有错误\n", table.TableName)
return errors.New("invalid table structure data")
}
wopts := rdb.NewDefaultWriteOptions()
topts := rdb.NewDefaultTransactionOptions()
meta.Mu.Lock()
defer meta.Mu.Unlock()
txn := meta.rocksDB.TransactionBegin(wopts, topts, nil)
key := []byte(b2db.Database + "/" + table.TableName)
value, err := json.Marshal(table)
if err != nil {
log.Fatalf("将表 %s META数据转换为json时发生错误: %v\n", table.TableName, err)
return err
}
err = txn.Put(key, value)
if err != nil {
log.Fatalf("在数据库 %s 中创建表 %s 时发生错误: %v\n", b2db.Database, table.TableName, err)
_ = txn.Rollback()
return err
}
b2db.TableList = append(b2db.TableList, table.TableName)
key = []byte(b2db.Database)
value, err = json.Marshal(b2db)
if err != nil {
log.Fatalf("将数据库 %s META数据转换为json时发生错误: %v\n", b2db.Database, err)
_ = txn.Rollback()
b2db.TableList = b2db.TableList[:len(b2db.TableList)-1]
return err
}
if err = txn.Put(key, value); err != nil {
log.Fatalf("更新数据库 %s 的META时出错: %v\n", b2db.Database, err)
_ = txn.Rollback()
b2db.TableList = b2db.TableList[:len(b2db.TableList)-1]
return err
}
_ = txn.Commit()
// TODO: up broadcast meta data to global index
return nil
}
// RemoveTable 从数据库中移除表
func (b2db *B2Database) RemoveTable(tableName string, meta *MetaDBSource) error {
wopts := rdb.NewDefaultWriteOptions()
topts := rdb.NewDefaultTransactionOptions()
meta.Mu.Lock()
defer meta.Mu.Unlock()
txn := meta.rocksDB.TransactionBegin(wopts, topts, nil)
// TODO: remove all KV storage owned by the table
key := []byte(b2db.Database + "/" + tableName)
if err := txn.Delete(key); err != nil {
log.Fatalf("删除数据库 %s 中的表 %s 的元数据时发生错误: %v\n", b2db.Database, tableName, err)
_ = txn.Rollback()
return err
}
pos := funk.IndexOf(b2db.TableList, tableName)
if pos == -1 {
log.Printf("数据库表 %s 在数据库 %s META数据中已经被移除\n", tableName, b2db.Database)
_ = txn.Commit()
return nil
}
restore := make([]string, len(b2db.TableList))
copy(restore, b2db.TableList)
copy(b2db.TableList[pos:], b2db.TableList[pos+1:])
b2db.TableList = b2db.TableList[:len(b2db.TableList)-1]
key = []byte(b2db.Database)
value, err := json.Marshal(b2db)
if err != nil {
log.Fatalf("数据库 %s META数据转换json时发生错误: %v\n", b2db.Database, err)
_ = txn.Rollback()
b2db.TableList = restore
return err
}
if err = txn.Put(key, value); err != nil {
log.Fatalf("更新数据库 %s META数据时发生错误: %v\n", b2db.Database, err)
_ = txn.Rollback()
b2db.TableList = restore
return err
}
_ = txn.Commit()
// TODO: up broadcast meta data to global index
return nil
}
// Close 关闭数据库连接
func (b2db *B2Database) Close() {
if b2db.RocksDbReadConn != nil {
b2db.RocksDbReadConn.Close()
}
if b2db.RocksDbWriteConn != nil {
b2db.RocksDbWriteConn.Close()
}
}