Skip to content

Commit

Permalink
Merge pull request #4 from ahrtr/compaction_20230422
Browse files Browse the repository at this point in the history
add flag '--compaction' to support executing compaction before defragmentation
  • Loading branch information
ahrtr authored Apr 22, 2023
2 parents 095d8ef + 97dbcba commit 923eb00
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 82 deletions.
97 changes: 57 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ with `etcd-defrag [flags]` without compromising any experience.
It adds the following extra flags,
| Flag | Description |
|------------------------------|-------------|
| `---compaction` | whether execute compaction before the defragmentation, defaults to `true` |
| `--continue-on-error` | whether continue to defragment next endpoint if current one fails, defaults to `true` |
| `--etcd-storage-quota-bytes` | etcd storage quota in bytes (the value passed to etcd instance by flag --quota-backend-bytes), defaults to `2*1024*1024*1024` |
| `--defrag-rule` | defragmentation rule (etcd-defrag will run defragmentation if the rule is empty or it is evaluated to true), defaults to empty. See more details below. |
Expand All @@ -41,14 +42,17 @@ Output:
Validating configuration.
No defragmentation rule provided
Performing health check.
endpoint: http://127.0.0.1:2379, health: true, took: 2.777089ms, error:
endpoint: http://127.0.0.1:32379, health: true, took: 2.936072ms, error:
endpoint: http://127.0.0.1:22379, health: true, took: 2.810535ms, error:
endpoint: http://127.0.0.1:22379, health: true, took: 7.227642ms, error:
endpoint: http://127.0.0.1:2379, health: true, took: 13.255694ms, error:
endpoint: http://127.0.0.1:32379, health: true, took: 6.666809ms, error:
Getting members status
endpoint: http://127.0.0.1:22379, dbSize: 24576, dbSizeInUse: 24576, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269
1 endpoints need to be defragmented: [http://127.0.0.1:22379]
Defragmenting endpoint: http://127.0.0.1:22379
Finished defragmenting etcd member[http://127.0.0.1:22379]. took 120.359753ms
endpoint: http://127.0.0.1:22379, dbSize: 167936, dbSizeInUse: 167936, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 9779, term: 2, index: 9831
Running compaction until revision: 9779 ... successful
1 endpoint(s) need to be defragmented: [http://127.0.0.1:22379]
[Before defragmentation] endpoint: http://127.0.0.1:22379, dbSize: 167936, dbSizeInUse: 94208, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 9779, term: 2, index: 9832
Defragmenting endpoint "http://127.0.0.1:22379"
Finished defragmenting etcd endpoint "http://127.0.0.1:22379". took 161.063637ms
[Post defragmentation] endpoint: http://127.0.0.1:22379, dbSize: 90112, dbSizeInUse: 81920, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 9779, term: 2, index: 9832
The defragmentation is successful.
```

Expand All @@ -62,17 +66,22 @@ Output:
Validating configuration.
No defragmentation rule provided
Performing health check.
endpoint: http://127.0.0.1:2379, health: true, took: 3.378808ms, error:
endpoint: http://127.0.0.1:32379, health: true, took: 3.532621ms, error:
endpoint: http://127.0.0.1:22379, health: true, took: 3.348378ms, error:
endpoint: http://127.0.0.1:2379, health: true, took: 6.368905ms, error:
endpoint: http://127.0.0.1:22379, health: true, took: 6.497803ms, error:
endpoint: http://127.0.0.1:32379, health: true, took: 6.745877ms, error:
Getting members status
endpoint: http://127.0.0.1:22379, dbSize: 24576, dbSizeInUse: 16384, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269
endpoint: http://127.0.0.1:32379, dbSize: 24576, dbSizeInUse: 24576, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269
2 endpoints need to be defragmented: [http://127.0.0.1:22379 http://127.0.0.1:32379]
Defragmenting endpoint: http://127.0.0.1:22379
Finished defragmenting etcd member[http://127.0.0.1:22379]. took 118.112952ms
Defragmenting endpoint: http://127.0.0.1:32379
Finished defragmenting etcd member[http://127.0.0.1:32379]. took 127.034399ms
endpoint: http://127.0.0.1:22379, dbSize: 106496, dbSizeInUse: 106496, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 9907, term: 2, index: 9963
endpoint: http://127.0.0.1:32379, dbSize: 167936, dbSizeInUse: 106496, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269, revision: 9907, term: 2, index: 9963
Running compaction until revision: 9907 ... successful
2 endpoint(s) need to be defragmented: [http://127.0.0.1:22379 http://127.0.0.1:32379]
[Before defragmentation] endpoint: http://127.0.0.1:22379, dbSize: 110592, dbSizeInUse: 94208, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 9907, term: 2, index: 9964
Defragmenting endpoint "http://127.0.0.1:22379"
Finished defragmenting etcd endpoint "http://127.0.0.1:22379". took 171.412229ms
[Post defragmentation] endpoint: http://127.0.0.1:22379, dbSize: 90112, dbSizeInUse: 81920, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 9907, term: 2, index: 9964
[Before defragmentation] endpoint: http://127.0.0.1:32379, dbSize: 167936, dbSizeInUse: 94208, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269, revision: 9907, term: 2, index: 9964
Defragmenting endpoint "http://127.0.0.1:32379"
Finished defragmenting etcd endpoint "http://127.0.0.1:32379". took 132.445712ms
[Post defragmentation] endpoint: http://127.0.0.1:32379, dbSize: 90112, dbSizeInUse: 81920, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269, revision: 9907, term: 2, index: 9964
The defragmentation is successful.
```

Expand All @@ -86,20 +95,27 @@ Output:
Validating configuration.
No defragmentation rule provided
Performing health check.
endpoint: http://127.0.0.1:2379, health: true, took: 3.145954ms, error:
endpoint: http://127.0.0.1:32379, health: true, took: 3.193554ms, error:
endpoint: http://127.0.0.1:22379, health: true, took: 3.235073ms, error:
endpoint: http://127.0.0.1:2379, health: true, took: 4.702492ms, error:
endpoint: http://127.0.0.1:22379, health: true, took: 5.017075ms, error:
endpoint: http://127.0.0.1:32379, health: true, took: 4.747068ms, error:
Getting members status
endpoint: http://127.0.0.1:2379, dbSize: 24576, dbSizeInUse: 24576, memberId: 8211f1d0f64f3269, leader: 8211f1d0f64f3269
endpoint: http://127.0.0.1:22379, dbSize: 24576, dbSizeInUse: 16384, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269
endpoint: http://127.0.0.1:32379, dbSize: 24576, dbSizeInUse: 16384, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269
3 endpoints need to be defragmented: [http://127.0.0.1:22379 http://127.0.0.1:32379 http://127.0.0.1:2379]
Defragmenting endpoint: http://127.0.0.1:22379
Finished defragmenting etcd member[http://127.0.0.1:22379]. took 118.562954ms
Defragmenting endpoint: http://127.0.0.1:32379
Finished defragmenting etcd member[http://127.0.0.1:32379]. took 118.424389ms
Defragmenting endpoint: http://127.0.0.1:2379
Finished defragmenting etcd member[http://127.0.0.1:2379]. took 117.058608ms
endpoint: http://127.0.0.1:2379, dbSize: 172032, dbSizeInUse: 126976, memberId: 8211f1d0f64f3269, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10425
endpoint: http://127.0.0.1:22379, dbSize: 122880, dbSizeInUse: 122880, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10425
endpoint: http://127.0.0.1:32379, dbSize: 122880, dbSizeInUse: 122880, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10425
Running compaction until revision: 10365 ... successful
3 endpoint(s) need to be defragmented: [http://127.0.0.1:22379 http://127.0.0.1:32379 http://127.0.0.1:2379]
[Before defragmentation] endpoint: http://127.0.0.1:22379, dbSize: 126976, dbSizeInUse: 90112, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10426
Defragmenting endpoint "http://127.0.0.1:22379"
Finished defragmenting etcd endpoint "http://127.0.0.1:22379". took 224.151378ms
[Post defragmentation] endpoint: http://127.0.0.1:22379, dbSize: 90112, dbSizeInUse: 81920, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10426
[Before defragmentation] endpoint: http://127.0.0.1:32379, dbSize: 126976, dbSizeInUse: 90112, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10426
Defragmenting endpoint "http://127.0.0.1:32379"
Finished defragmenting etcd endpoint "http://127.0.0.1:32379". took 139.138035ms
[Post defragmentation] endpoint: http://127.0.0.1:32379, dbSize: 90112, dbSizeInUse: 81920, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10426
[Before defragmentation] endpoint: http://127.0.0.1:2379, dbSize: 172032, dbSizeInUse: 94208, memberId: 8211f1d0f64f3269, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10426
Defragmenting endpoint "http://127.0.0.1:2379"
Finished defragmenting etcd endpoint "http://127.0.0.1:2379". took 135.171807ms
[Post defragmentation] endpoint: http://127.0.0.1:2379, dbSize: 90112, dbSizeInUse: 81920, memberId: 8211f1d0f64f3269, leader: 8211f1d0f64f3269, revision: 10365, term: 2, index: 10426
The defragmentation is successful.
```

Expand Down Expand Up @@ -142,19 +158,20 @@ Output:
Validating configuration.
Validating the defragmentation rule: dbSize > dbQuota*80/100 || dbSize - dbSizeInUse > 200*1024*1024 ... valid
Performing health check.
endpoint: http://127.0.0.1:2379, health: true, took: 3.04562ms, error:
endpoint: http://127.0.0.1:32379, health: true, took: 3.105274ms, error:
endpoint: http://127.0.0.1:22379, health: true, took: 2.984679ms, error:
endpoint: http://127.0.0.1:2379, health: true, took: 6.993264ms, error:
endpoint: http://127.0.0.1:32379, health: true, took: 7.483368ms, error:
endpoint: http://127.0.0.1:22379, health: true, took: 49.441931ms, error:
Getting members status
endpoint: http://127.0.0.1:2379, dbSize: 24576, dbSizeInUse: 24576, memberId: 8211f1d0f64f3269, leader: 8211f1d0f64f3269
endpoint: http://127.0.0.1:22379, dbSize: 24576, dbSizeInUse: 24576, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269
endpoint: http://127.0.0.1:32379, dbSize: 24576, dbSizeInUse: 24576, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269
3 endpoints need to be defragmented: [http://127.0.0.1:22379 http://127.0.0.1:32379 http://127.0.0.1:2379]
Defragmenting endpoint: http://127.0.0.1:22379
endpoint: http://127.0.0.1:2379, dbSize: 131072, dbSizeInUse: 131072, memberId: 8211f1d0f64f3269, leader: 8211f1d0f64f3269, revision: 10964, term: 2, index: 11028
endpoint: http://127.0.0.1:22379, dbSize: 131072, dbSizeInUse: 131072, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 10964, term: 2, index: 11028
endpoint: http://127.0.0.1:32379, dbSize: 131072, dbSizeInUse: 131072, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269, revision: 10964, term: 2, index: 11028
Running compaction until revision: 10964 ... successful
3 endpoint(s) need to be defragmented: [http://127.0.0.1:22379 http://127.0.0.1:32379 http://127.0.0.1:2379]
[Before defragmentation] endpoint: http://127.0.0.1:22379, dbSize: 139264, dbSizeInUse: 90112, memberId: 91bc3c398fb3c146, leader: 8211f1d0f64f3269, revision: 10964, term: 2, index: 11029
Evaluation result is false, so skipping endpoint: http://127.0.0.1:22379
Defragmenting endpoint: http://127.0.0.1:32379
[Before defragmentation] endpoint: http://127.0.0.1:32379, dbSize: 139264, dbSizeInUse: 139264, memberId: fd422379fda50e48, leader: 8211f1d0f64f3269, revision: 10964, term: 2, index: 11029
Evaluation result is false, so skipping endpoint: http://127.0.0.1:32379
Defragmenting endpoint: http://127.0.0.1:2379
[Before defragmentation] endpoint: http://127.0.0.1:2379, dbSize: 139264, dbSizeInUse: 90112, memberId: 8211f1d0f64f3269, leader: 8211f1d0f64f3269, revision: 10964, term: 2, index: 11029
Evaluation result is false, so skipping endpoint: http://127.0.0.1:2379
The defragmentation is successful.
```
Expand Down
73 changes: 58 additions & 15 deletions membersinfo.go → agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,35 +149,78 @@ type epStatus struct {
}

func (es epStatus) String() string {
return fmt.Sprintf("endpoint: %s, dbSize: %d, dbSizeInUse: %d, memberId: %x, leader: %x",
es.Ep, es.Resp.DbSize, es.Resp.DbSizeInUse, es.Resp.Header.MemberId, es.Resp.Leader)
return fmt.Sprintf("endpoint: %s, dbSize: %d, dbSizeInUse: %d, memberId: %x, leader: %x, revision: %d, term: %d, index: %d",
es.Ep, es.Resp.DbSize, es.Resp.DbSizeInUse, es.Resp.Header.MemberId, es.Resp.Leader, es.Resp.Header.Revision, es.Resp.RaftTerm, es.Resp.RaftIndex)
}

func memberStatus(gcfg globalConfig) ([]epStatus, error) {
func membersStatus(gcfg globalConfig) ([]epStatus, error) {
eps, err := endpoints(gcfg)
if err != nil {
return nil, err
}

cfgSpec := clientConfigWithoutEndpoints(gcfg)

var statusList []epStatus
for _, ep := range eps {
cfgSpec.Endpoints = []string{ep}
c, err := createClient(cfgSpec)
status, err := memberStatus(gcfg, ep)
if err != nil {
return nil, fmt.Errorf("failed to createClient: %w", err)
return nil, fmt.Errorf("failed to get member(%q) status: %w", ep, err)
}
statusList = append(statusList, status)
}

return statusList, nil
}

func memberStatus(gcfg globalConfig, ep string) (epStatus, error) {
cfgSpec := clientConfigWithoutEndpoints(gcfg)
cfgSpec.Endpoints = []string{ep}
c, err := createClient(cfgSpec)
if err != nil {
return epStatus{}, fmt.Errorf("failed to createClient: %w", err)
}

ctx, cancel := commandCtx(gcfg.commandTimeout)
resp, err := c.Status(ctx, ep)
ctx, cancel := commandCtx(gcfg.commandTimeout)
defer func() {
c.Close()
cancel()
}()
resp, err := c.Status(ctx, ep)

return epStatus{Ep: ep, Resp: resp}, err
}

func compact(gcfg globalConfig, rev int64, ep string) error {
cfgSpec := clientConfigWithoutEndpoints(gcfg)
cfgSpec.Endpoints = []string{ep}
c, err := createClient(cfgSpec)
if err != nil {
return err
}

ctx, cancel := commandCtx(gcfg.commandTimeout)
defer func() {
c.Close()
if err != nil {
return nil, fmt.Errorf("failed to get member(%q) status: %w", ep, err)
}
statusList = append(statusList, epStatus{Ep: ep, Resp: resp})
cancel()
}()

_, err = c.Compact(ctx, rev, []clientv3.CompactOption{clientv3.WithCompactPhysical()}...)
return err
}

func defragment(gcfg globalConfig, ep string) error {
cfgSpec := clientConfigWithoutEndpoints(gcfg)
cfgSpec.Endpoints = []string{ep}
c, err := createClient(cfgSpec)
if err != nil {
return err
}

return statusList, nil
ctx, cancel := commandCtx(gcfg.commandTimeout)
defer func() {
c.Close()
cancel()
}()

_, err = c.Defragment(ctx, ep)
return err
}
8 changes: 5 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
)

type globalConfig struct {
endpoints []string
endpoints []string
useClusterEndpoints bool

dialTimeout time.Duration
commandTimeout time.Duration
Expand All @@ -29,8 +30,9 @@ type globalConfig struct {
username string
password string

useClusterEndpoints bool
continueOnError bool
compaction bool

continueOnError bool

dbQuotaBytes int
defragRule string
Expand Down
76 changes: 52 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func newDefragCommand() *cobra.Command {
}

defragCmd.Flags().StringSliceVar(&globalCfg.endpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd endpoints")
defragCmd.Flags().BoolVar(&globalCfg.useClusterEndpoints, "cluster", false, "use all endpoints from the cluster member list")

defragCmd.Flags().DurationVar(&globalCfg.dialTimeout, "dial-timeout", 2*time.Second, "dial timeout for client connections")
defragCmd.Flags().DurationVar(&globalCfg.commandTimeout, "command-timeout", 60*time.Second, "command timeout (excluding dial timeout)")
Expand All @@ -41,7 +42,8 @@ func newDefragCommand() *cobra.Command {
defragCmd.Flags().StringVarP(&globalCfg.dnsService, "discovery-srv-name", "", "", "service name to query when using DNS discovery")
defragCmd.Flags().BoolVar(&globalCfg.insecureDiscovery, "insecure-discovery", true, "accept insecure SRV records describing cluster endpoints")

defragCmd.Flags().BoolVar(&globalCfg.useClusterEndpoints, "cluster", false, "use all endpoints from the cluster member list")
defragCmd.Flags().BoolVar(&globalCfg.compaction, "compaction", true, "whether execute compaction before the defragmentation (defaults to true)")

defragCmd.Flags().BoolVar(&globalCfg.continueOnError, "continue-on-error", true, "whether continue to defragment next endpoint if current one fails")

defragCmd.Flags().IntVar(&globalCfg.dbQuotaBytes, "etcd-storage-quota-bytes", 2*1024*1024*1024, "etcd storage quota in bytes (the value passed to etcd instance by flag --quota-backend-bytes)")
Expand Down Expand Up @@ -75,9 +77,9 @@ func defragCommandFunc(cmd *cobra.Command, args []string) {
}

fmt.Println("Getting members status")
statusList, err := getMemberStatus(globalCfg)
statusList, err := getMembersStatus(globalCfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to get members' status: %v\n", err)
fmt.Fprintf(os.Stderr, "Failed to get members status: %v\n", err)
os.Exit(1)
}

Expand All @@ -87,15 +89,32 @@ func defragCommandFunc(cmd *cobra.Command, args []string) {
os.Exit(1)
}

fmt.Printf("%d endpoints need to be defragmented: %v\n", len(eps), eps)
cfg := clientConfigWithoutEndpoints(globalCfg)
failures := 0
for i, ep := range eps {
cfg.Endpoints = []string{ep}
if globalCfg.compaction {
fmt.Printf("Running compaction until revision: %d ... ", statusList[0].Resp.Header.Revision)
if err := compact(globalCfg, statusList[0].Resp.Header.Revision, eps[0]); err != nil {
fmt.Printf("failed, %v\n", err)
} else {
fmt.Println("successful")
}
} else {
fmt.Println("Skip compaction.")
}

fmt.Printf("Defragmenting endpoint: %s\n", ep)
fmt.Printf("%d endpoint(s) need to be defragmented: %v\n", len(eps), eps)
failures := 0
for _, ep := range eps {
fmt.Print("[Before defragmentation] ")
status, err := getMemberStatus(globalCfg, ep)
if err != nil {
failures++
fmt.Fprintf(os.Stderr, "Failed to get member (%q) status, error: %v\n", ep, err)
if !globalCfg.continueOnError {
break
}
continue
}

evalRet, err := evaluate(globalCfg, statusList[i])
evalRet, err := evaluate(globalCfg, status)
if !evalRet || err != nil {
if err != nil {
failures++
Expand All @@ -105,38 +124,38 @@ func defragCommandFunc(cmd *cobra.Command, args []string) {
}
continue
}

fmt.Fprintf(os.Stderr, "Evaluation result is false, so skipping endpoint: %s\n", ep)
continue
}

c, err := createClient(cfg)
fmt.Printf("Defragmenting endpoint %q\n", ep)
startTs := time.Now()
err = defragment(globalCfg, ep)
d := time.Since(startTs)
if err != nil {
failures++
fmt.Fprintf(os.Stderr, "Failed to connect to member[%s]: %v\n", ep, err)
fmt.Fprintf(os.Stderr, "Failed to defragment etcd member %q. took %s. (%v)\n", ep, d.String(), err)
if !globalCfg.continueOnError {
break
}
continue
} else {
fmt.Printf("Finished defragmenting etcd endpoint %q. took %s\n", ep, d.String())
}

ctx, cancel := commandCtx(globalCfg.commandTimeout)
startTs := time.Now()
_, err = c.Defragment(ctx, ep)
d := time.Since(startTs)
cancel()

fmt.Print("[Post defragmentation] ")
_, err = getMemberStatus(globalCfg, ep)
if err != nil {
failures++
fmt.Fprintf(os.Stderr, "Failed to defragment etcd member [%s]. took %s. (%v)\n", ep, d.String(), err)
fmt.Fprintf(os.Stderr, "Failed to get member (%q) status, error: %v\n", ep, err)
if !globalCfg.continueOnError {
break
}
} else {
fmt.Printf("Finished defragmenting etcd member[%s]. took %s\n", ep, d.String())
continue
}
}
if failures != 0 {
fmt.Fprintf(os.Stderr, "%d (total %d) endpoint(s) failed to be defragmented.\n", failures, len(eps))
os.Exit(1)
}
fmt.Println("The defragmentation is successful.")
Expand Down Expand Up @@ -189,8 +208,8 @@ func healthCheck(gcfg globalConfig) bool {
return unhealthyCount == 0
}

func getMemberStatus(gcfg globalConfig) ([]epStatus, error) {
statusList, err := memberStatus(gcfg)
func getMembersStatus(gcfg globalConfig) ([]epStatus, error) {
statusList, err := membersStatus(gcfg)
if err != nil {
return nil, err
}
Expand All @@ -200,3 +219,12 @@ func getMemberStatus(gcfg globalConfig) ([]epStatus, error) {
}
return statusList, nil
}

func getMemberStatus(gcfg globalConfig, ep string) (epStatus, error) {
status, err := memberStatus(gcfg, ep)
if err != nil {
return epStatus{}, err
}
fmt.Println(status.String())
return status, nil
}

0 comments on commit 923eb00

Please sign in to comment.