/
restore.go
112 lines (108 loc) · 2.73 KB
/
restore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package core
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/immesys/go-ceph/rados"
"github.com/urfave/cli"
)
func Restore(c *cli.Context) error {
if c.Bool("skip-etcd") && c.Bool("skip-ceph") {
fmt.Printf("you can't skip both ceph and etcd\n")
os.Exit(1)
}
gpoolsmap = make(map[string]string)
restorepools := []string{}
for _, arg := range c.StringSlice("restore-pool") {
parts := strings.SplitN(arg, ":", -1)
if len(parts) != 2 {
fmt.Printf("invalid restore-pool argument %q", arg)
os.Exit(1)
}
gpoolsmap[parts[0]] = parts[1]
restorepools = append(restorepools, parts[1])
}
getPassphrase()
createFrameWriter()
createReadSharder(c)
skipceph := true
skipetcd := true
if !c.Bool("skip-etcd") {
fmt.Printf("connecting etcd\n")
connectEtcd(c)
skipetcd = false
}
if !c.Bool("skip-ceph") {
fmt.Printf("connecting ceph\n")
connectCeph(c)
checkPoolsExist(nil, restorepools)
skipceph = false
}
poolconns := make(map[string]*rados.IOContext)
cephcount := 0
skipcephcount := 0
etcdcount := 0
container, more := gsharder.Read()
for more {
co, ok := container.(*CephObject)
if ok {
if skipceph {
container, more = gsharder.Read()
continue
}
restorepool, ok := gpoolsmap[co.Pool]
if !ok {
skipcephcount++
container, more = gsharder.Read()
continue
}
cephcount++
if cephcount%200 == 0 {
fmt.Printf("Restored %d objects, skipped %d objects\n", cephcount, skipcephcount)
}
ioctx, ok := poolconns[restorepool]
if !ok {
var err error
ioctx, err = gcephconn.OpenIOContext(restorepool)
ChkErr(err)
poolconns[restorepool] = ioctx
}
ioctx.SetNamespace(co.Namespace)
if len(co.Content) > 0 {
err := ioctx.WriteFull(co.Name, co.Content)
ChkErr(err)
}
for _, kv := range co.XATTRData {
err := ioctx.SetXattr(co.Name, kv.Key, kv.Value)
ChkErr(err)
}
for _, kv := range co.OMAPData {
err := ioctx.SetOmap(co.Name, map[string][]byte{kv.Key: kv.Value})
ChkErr(err)
}
//fmt.Printf("CO P=%s NS=%q OMAP=%d XATTR=%d OID=%s\n", co.Pool, co.Namespace, len(co.OMAPData), len(co.XATTRData), co.Name)
} else {
et, ok := container.(*EtcdRecords)
if ok {
if skipetcd {
container, more = gsharder.Read()
continue
}
etcdcount += len(et.KVz)
for _, kv := range et.KVz {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
_, err := getcd.Put(ctx, kv.Key, string(kv.Value))
ChkErr(err)
cancel()
}
} else {
fmt.Printf("unknown container!\n")
}
}
container, more = gsharder.Read()
}
fmt.Printf("Restore complete: restored %d ceph objects, skipped %d ceph objects and restored %d etcd records\n", cephcount, skipcephcount, etcdcount)
return nil
}