Skip to content

Commit

Permalink
feat: add big key tool (OpenAtomFoundation#2195)
Browse files Browse the repository at this point in the history
* feat: add big key tool

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* modify heap to maxHeap

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* modify comment

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* add ci test

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* add compress and decompress

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* fix ci

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* fix ci

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* fix ci

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* fix ci

Signed-off-by: sjcsjc123 <1401189096@qq.com>

* fix ci

Signed-off-by: sjcsjc123 <1401189096@qq.com>

---------

Signed-off-by: sjcsjc123 <1401189096@qq.com>
  • Loading branch information
sjcsjc123 committed Dec 21, 2023
1 parent 1f9b0bf commit fc68552
Show file tree
Hide file tree
Showing 13 changed files with 1,261 additions and 3 deletions.
12 changes: 9 additions & 3 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ jobs:
- name: Run Go E2E Tests
working-directory: ${{ github.workspace }}/build
run: |
cd ../tests/integration/
cd ../tools/pika_keys_analysis/
go test -v ./...
cd ../../tests/integration/
chmod +x integrate_test.sh
sh integrate_test.sh
Expand Down Expand Up @@ -152,7 +154,9 @@ jobs:
- name: Run Go E2E Tests
working-directory: ${{ github.workspace }}/build
run: |
cd ../tests/integration/
cd ../tools/pika_keys_analysis/
go test -v ./...
cd ../../tests/integration/
chmod +x integrate_test.sh
sh integrate_test.sh
Expand Down Expand Up @@ -210,7 +214,9 @@ jobs:
- name: Run Go E2E Tests
working-directory: ${{ github.workspace }}/build
run: |
cd ../tests/integration/
cd ../tools/pika_keys_analysis/
go test -v ./...
cd ../../tests/integration/
chmod +x integrate_test.sh
sh integrate_test.sh
Expand Down
138 changes: 138 additions & 0 deletions tools/pika_keys_analysis/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package pika_keys_analysis

import (
"context"
"fmt"
"strings"
"time"

"github.com/desertbit/grumble"
"github.com/fatih/color"
)

var App = grumble.New(&grumble.Config{
Name: "pika_keys_analysis",
Description: "A tool for analyzing keys in Pika",
HistoryFile: "/tmp/.pika_keys_analysis_history",
Prompt: "pika_keys_analysis > ",
HistoryLimit: 100,
ErrorColor: color.New(color.FgRed, color.Bold, color.Faint),
HelpHeadlineColor: color.New(color.FgGreen),
HelpHeadlineUnderline: false,
HelpSubCommands: true,
PromptColor: color.New(color.FgBlue, color.Bold),
Flags: func(f *grumble.Flags) {},
})

func init() {
App.OnInit(func(a *grumble.App, fm grumble.FlagMap) error {
return nil
})
App.SetPrintASCIILogo(func(a *grumble.App) {
fmt.Println(strings.Join([]string{`
............. .... ..... ..... .....
################# #### ##### ##### #######
#### ##### #### ##### ##### #########
#### ##### #### ##### ##### #### #####
#### ##### #### ##### ##### #### #####
################ #### ##### ##### #### #####
#### #### ##### ##### #################
#### #### ##### ###### ##### #####
#### #### ##### ###### ##### #####
`}, "\r\n"))
})
register(App)
}

func register(app *grumble.App) {
app.AddCommand(&grumble.Command{
Name: "bigKey",
Help: "list the big keys",
LongHelp: "list the big keys",
Run: func(c *grumble.Context) error {
listBigKeys, err := PikaInstance.ListBigKeysByScan(context.Background())
if err != nil {
return err
}
start := time.Now()
for keyType, data := range listBigKeys {
fmt.Printf("Type: %s, Head: %d\n", keyType, Head)
if len(data.GetTopN(Head)) == 0 {
fmt.Println("No big key found")
}
for _, v := range data.GetTopN(Head) {
fmt.Printf("Key : %s, Size: %d, From: %s\n", v.Key, v.UsedSize, v.Client)
}
}
end := time.Now()
if PrintKeyNum {
fmt.Println("Total Key Number:", PikaInstance.GetTotalKeyNumber())
}
fmt.Println("Cost Time:", end.Sub(start))
return nil
},
})

app.AddCommand(&grumble.Command{
Name: "apply",
Help: "Apply the settings to Pika",
LongHelp: "Apply the settings to Pika",
Args: func(a *grumble.Args) {
a.String("filename", "The configuration file")
},
Run: func(c *grumble.Context) error {
filename := c.Args.String("filename")
return Init(filename)
},
})

app.AddCommand(&grumble.Command{
Name: "compress",
Help: "Compress the big keys",
LongHelp: "Compress the big keys and store them to pika",
Args: func(a *grumble.Args) {
a.String("key", "The key to compress")
},
Run: func(c *grumble.Context) error {
key := c.Args.String("key")
return PikaInstance.CompressKey(context.Background(), key)
},
})

app.AddCommand(&grumble.Command{
Name: "decompress",
Help: "Decompress the big keys",
LongHelp: "Decompress the big keys and store them to pika",
Args: func(a *grumble.Args) {
a.String("key", "The key to decompress")
},
Flags: func(f *grumble.Flags) {
f.Bool("s", "save", false, "Save the decompressed value to pika")
},
Run: func(c *grumble.Context) error {
key := c.Args.String("key")
save := c.Flags.Bool("save")
decompressKey, err := PikaInstance.DecompressKey(context.Background(), key, save)
if err != nil {
return err
}
fmt.Printf("Key: %s, Decompress: %s\n", key, decompressKey)
return nil
},
})

app.AddCommand(&grumble.Command{
Name: "recover",
Help: "Recover the big keys",
LongHelp: "Recover the big keys and store them to pika",
Args: func(a *grumble.Args) {
a.String("key", "The key to recover")
a.String("newKey", "The new key to store the recovered value")
},
Run: func(c *grumble.Context) error {
key := c.Args.String("key")
newKey := c.Args.String("newKey")
return PikaInstance.RecoverKey(context.Background(), key, newKey)
},
})
}
39 changes: 39 additions & 0 deletions tools/pika_keys_analysis/cli/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# What is this?
This is a tool to analyze the keys of a pika cluster.
# How to use?
## 1. Install
```shell
go build -o pika_keys_analysis main.go
```
## 2. Start
```shell
./pika_keys_analysis config.yaml
```
## 3. List big keys
```shell
bigKey
```
## 4. Apply Config
```shell
apply config.yaml
```
## 5. Compress Key
```shell
compress <key>
```
## 6. Decompress Key
- not save to pika
```shell
decompress <key>
```
- save to pika
```shell
decompress -s <key>
```
## 7. Recover Key
```shell
recover <from> <to>
```
# Notice

When using compression and decompression functions, errors in operation may cause duplicate compression or decompression, and the files used for recovery may be overwritten. If they are overwritten, the decompress command can be used to reach a state where decompression cannot continue, and then continue to compress to use the recover command normally
21 changes: 21 additions & 0 deletions tools/pika_keys_analysis/cli/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pika:
- addr: 127.0.0.1:9221
db: 0
password: ""

- addr: 127.0.0.1:9221
db: 1
password: ""

scan-size: 1000 # scan size per time
concurrency: 1000 # goroutine num
head: 30 # show top head keys
type:
- string
- hash
- list
- set
- zset
memory: 2000 # Memory limit, unit: MB
print: true # Print key number or not, will use keys command
save: ./save/ # Save dir path, will save key value pairs when compress
24 changes: 24 additions & 0 deletions tools/pika_keys_analysis/cli/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"fmt"
"os"
"pika/tools/pika_keys_analysis"

"github.com/desertbit/grumble"
)

func main() {
if len(os.Args) != 2 {
fmt.Println("Usage: pika_keys_analysis <config file>")
os.Exit(1)
}
err := pika_keys_analysis.Init(os.Args[1])
if err != nil {
fmt.Println(err)
os.Exit(1)
}

os.Args = os.Args[0:1]
grumble.Main(pika_keys_analysis.App)
}
72 changes: 72 additions & 0 deletions tools/pika_keys_analysis/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package pika_keys_analysis

import (
"bytes"
"compress/gzip"
"io/ioutil"
"os"
"path/filepath"
)

func compress(data []byte) ([]byte, error) {
var compressedData bytes.Buffer
writer := gzip.NewWriter(&compressedData)

_, err := writer.Write(data)
if err != nil {
return nil, err
}

err = writer.Close()
if err != nil {
return nil, err
}

return compressedData.Bytes(), nil
}

func decompress(compressedData []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewReader(compressedData))
if err != nil {
return nil, err
}

decompressedData, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}

err = reader.Close()
if err != nil {
return nil, err
}

return decompressedData, nil
}

func isCompressed(data []byte) bool {
return len(data) > 2 && data[0] == 0x1f && data[1] == 0x8b
}

// saveLocal saves the key-value pair to local file system.
func saveLocal(key []byte, value []byte) error {
_, err := os.ReadDir(Save)
if err != nil {
if os.IsNotExist(err) {
err = os.MkdirAll(Save, 0755)
if err != nil {
return err
}
} else {
return err
}
}
filename := filepath.Join(Save, string(key))
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write(value)
return err
}
56 changes: 56 additions & 0 deletions tools/pika_keys_analysis/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package pika_keys_analysis

import (
"os"

"gopkg.in/yaml.v3"
)

var (
PikaInstance *Pika
ScanSize = 1000
GoroutineNum = 100
Head = 10
Type = []string{"string", "hash", "list", "set", "zset"}
MemoryLimit = 1024 * 1024 * 200
PrintKeyNum = false
Save = "./save/"
)

type Config struct {
PikaConfig []PikaConfig `yaml:"pika"`
Concurrency int `yaml:"concurrency"`
ScanSize int `yaml:"scan-size"`
Head int `yaml:"head"`
MemoryLimit int `yaml:"memory"`
Type []string `yaml:"type"`
PrintKeyNum bool `yaml:"print"`
Save string `yaml:"save"`
}

type PikaConfig struct {
Addr string `yaml:"addr"`
Password string `yaml:"password"`
DB int `yaml:"db"`
}

func Init(filename string) error {
bytes, err := os.ReadFile(filename)
if err != nil {
return err
}
config := Config{}
err = yaml.Unmarshal(bytes, &config)
if err != nil {
return err
}
PikaInstance = NewPika(config.PikaConfig)
ScanSize = config.ScanSize
GoroutineNum = config.Concurrency
Head = config.Head
Type = config.Type
MemoryLimit = config.MemoryLimit * 1024 * 1024
PrintKeyNum = config.PrintKeyNum
Save = config.Save
return nil
}
Loading

0 comments on commit fc68552

Please sign in to comment.