Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fault injection and Redis double-write #9

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 8 additions & 6 deletions CHANGELOG.md
@@ -1,21 +1,23 @@
#### 2022-03-16
1. package redis add "double-write" strategy mode.
2. package mock add "Fault injection".
3. package redis add "Fault injection service".
4. package mysql add "Fault injection service".
5. add web package, which is gin-gorm integration.

#### 2022-01-04
version: github.com/huaweicloud/devcloud-go v0.1.1
feature:
1. mock package add etcd mock, replace test cases that rely on real etcd.

#### 2021-12-27
version: github.com/huaweicloud/devcloud-go v0.1.0
feature:
1. add dms package, which is a high performance and high reliability kafka consumer.
2. add mock package, which can mock redis, mysql and interface.
#### 2021-12-25
1. dms: persist the first N continuous offsets in offsetNode to the database and kafka broker, this will reduce repeated consumption of messages.
2. change dms/method.go BizHandler from interface to function types.
#### 2021-12-24
1. add mock package, which contains interface mock, redis mock and mysql mock.

#### 2021-12-16
version: github.com/huaweicloud/devcloud-go v0.0.1
version: github.com/huaweicloud/devcloud-go v0.0.1
feature:
1. add dms which is a kafka consumer.

Expand Down
6 changes: 4 additions & 2 deletions common/etcd/client.go
Expand Up @@ -11,10 +11,12 @@
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* Package etcd defines EtcdClient interface, and use "go.etcd.io/etcd/client/v3"
* implements the interface.
*/

/*
Package etcd defines EtcdClient interface, and use "go.etcd.io/etcd/client/v3"
implements the interface.
*/
package etcd

import (
Expand Down
16 changes: 1 addition & 15 deletions common/etcd/mocks/EtcdClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions common/password/decipher.go
Expand Up @@ -15,6 +15,10 @@
* user can set customize decipher by SetDecipher function.
*/

/*
Package password defines Decipher interface, which is used to decode password,
user can set customize decipher by SetDecipher function.
*/
package password

import "sync"
Expand Down
2 changes: 1 addition & 1 deletion common/util/util.go
Expand Up @@ -11,9 +11,9 @@
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* Package util provides some util function, such as ValidateHostPort.
*/

// Package util provides some util function, such as ValidateHostPort.
package util

import (
Expand Down
6 changes: 4 additions & 2 deletions dms/consumer.go
Expand Up @@ -11,10 +11,12 @@
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* Package dms implements a kafka consumer based on sarama, user can consume messages
* asynchronous or synchronous with dms, and ensure message not lost.
*/

/*
Package dms implements a kafka consumer based on sarama, user can consume messages
asynchronous or synchronous with dms, and ensure message not lost.
*/
package dms

import (
Expand Down
2 changes: 1 addition & 1 deletion dms/example/example.go
Expand Up @@ -11,9 +11,9 @@
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* Package example provides an example for user how to use dms.
*/

// Package example provides an example for user how to use dms.
package example

import (
Expand Down
2 changes: 1 addition & 1 deletion dms/offset_manager.go
Expand Up @@ -115,7 +115,7 @@ func (m *OffsetManager) handleOffsetOnCleanUp(offsetPersist OffsetPersist) int64
m.lock.RUnlock()
offset := minKey.(int64) + m.startOffset + int64(minNode.(*OffsetNode).maxContinuous())
if err := offsetPersist.Save(m.groupId, m.topic, m.partition, offset); err != nil {
log.Printf("WARNING: groupId/topic/partition %s/%s/%s persist %d fail on clean up, %v",
log.Printf("WARNING: groupId/topic/partition %s/%s/%d persist %d fail on clean up, %v",
m.groupId, m.topic, m.partition, offset, err)
}
return offset
Expand Down
11 changes: 9 additions & 2 deletions go.mod
Expand Up @@ -6,19 +6,26 @@ require (
github.com/RoaringBitmap/roaring v0.9.4
github.com/Shopify/sarama v1.29.1
github.com/alicebob/miniredis/v2 v2.15.1
github.com/astaxie/beego v1.12.3
github.com/bwmarrin/snowflake v0.3.0
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dolthub/go-mysql-server v0.11.0
github.com/dolthub/vitess v0.0.0-20211013185428-a8845fb919c1
github.com/emirpasic/gods v1.12.0
github.com/gin-gonic/gin v1.7.7
github.com/go-redis/redis/v8 v8.11.3
github.com/go-sql-driver/mysql v1.6.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.15.0
github.com/panjf2000/ants/v2 v2.4.7
github.com/panjf2000/ants/v2 v2.4.6
github.com/stretchr/testify v1.7.0
github.com/tidwall/redcon v1.4.4
go.etcd.io/etcd/api/v3 v3.5.1
go.etcd.io/etcd/client/v3 v3.5.1
go.etcd.io/etcd/server/v3 v3.5.1
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
gopkg.in/fatih/pool.v2 v2.0.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gorm.io/driver/mysql v1.3.2
gorm.io/gorm v1.23.1
)
Binary file added img/mysql-configuration.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/mysql-local-read-single-write.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/mysql-single-read-write.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/proxy.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/redis-configuration.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/redis-double-write.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/redis-local-read-single-write.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/redis-single-read-write.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion mas/config.go
Expand Up @@ -11,9 +11,9 @@
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* Package mas contains mas properties configuration.
*/

// Package mas contains mas properties configuration.
package mas

import "fmt"
Expand Down
191 changes: 191 additions & 0 deletions mas/injection.go
@@ -0,0 +1,191 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2021.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*/

package mas

import (
"math/rand"
"time"
)

// InjectionProperties chaos configuration
type InjectionProperties struct {
Active bool `yaml:"active"`
Duration int `yaml:"duration"`
Interval int `yaml:"interval"`
Percentage int `yaml:"percentage"`
DelayInjection *DelayInjection `yaml:"delayInjection"`
ErrorInjection *ErrorInjection `yaml:"errorInjection"`
}

// DelayInjection delay configuration
type DelayInjection struct {
Active bool `yaml:"active"`
Percentage int `yaml:"percentage"`
TimeMs int `yaml:"timeMs"`
JitterMs int `yaml:"jitterMs"`
}

// NewDelayInjection sda
func NewDelayInjection(active bool, percentage, timeMs, jitterMs int) *DelayInjection {
return &DelayInjection{
Active: active,
Percentage: percentage,
TimeMs: timeMs,
JitterMs: jitterMs,
}
}

func (d *DelayInjection) checkActive() (int, bool) {
if d.Active {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
if r.Intn(100) <= d.Percentage {
jitterMs := r.Intn(2*d.JitterMs+1) - d.JitterMs
if d.TimeMs+jitterMs < 0 {
return 0, true
}
return d.TimeMs + jitterMs, true
}
}
return 0, false
}

// InjectionError error details
type InjectionError struct {
Err error
Percentage int
}

// ErrorInjection error configuration
type ErrorInjection struct {
Active bool `yaml:"active"`
Percentage int `yaml:"percentage"`
errs []*InjectionError
}

func NewErrorInjection(active bool, percentage int) *ErrorInjection {
return &ErrorInjection{
Active: active,
Percentage: percentage,
errs: make([]*InjectionError, 0),
}
}

func (e *ErrorInjection) checkActive() (error, bool) {
if e.Active {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
if r.Intn(100) <= e.Percentage {
return e.errs[r.Intn(len(e.errs))].Err, true
}
}
return nil, false
}

// InjectionDuration ingestion period details
type InjectionDuration struct {
duration int
total int
startTimeMs int64
}

func NewInjectionDuration(duration, total int) *InjectionDuration {
return &InjectionDuration{
duration: duration,
total: total,
startTimeMs: time.Now().Unix(),
}
}

func (i *InjectionDuration) checkActive() bool {
return (time.Now().Unix()-i.startTimeMs)%int64(i.total) <= int64(i.duration)
}

// InjectionManagement chaos injection details
type InjectionManagement struct {
active bool
injectionDuration *InjectionDuration
percentage int
delayInjection *DelayInjection
errorInjection *ErrorInjection
}

func CompliancePercentage(percentage int) int {
if percentage < 0 {
return 0
}
if percentage > 100 {
return 100
}
return percentage
}

func NewInjectionManagement(chaos *InjectionProperties) *InjectionManagement {
return &InjectionManagement{
active: chaos.Active,
injectionDuration: NewInjectionDuration(chaos.Duration, chaos.Interval),
percentage: CompliancePercentage(chaos.Percentage),
delayInjection: NewDelayInjection(chaos.DelayInjection.Active,
CompliancePercentage(chaos.DelayInjection.Percentage),
chaos.DelayInjection.TimeMs,
chaos.DelayInjection.JitterMs),
errorInjection: NewErrorInjection(chaos.ErrorInjection.Active,
CompliancePercentage(chaos.ErrorInjection.Percentage)),
}
}

func (i *InjectionManagement) SetError(errs []error) {
if i.errorInjection.Active {
if errs != nil && len(errs) > 0 {
for _, err := range errs {
i.errorInjection.errs = append(i.errorInjection.errs,
&InjectionError{err, i.errorInjection.Percentage})
}
}
}
}

func (i *InjectionManagement) AddError(errs []*InjectionError) {
if i.errorInjection.Active {
if errs != nil && len(errs) > 0 {
for _, err := range errs {
err.Percentage = CompliancePercentage(err.Percentage)
}
i.errorInjection.errs = append(i.errorInjection.errs, errs...)
}
}
}

// Inject chaos injection triggering
func (i *InjectionManagement) Inject() error {
if i == nil {
return nil
}
if !i.active {
return nil
}
if !i.injectionDuration.checkActive() {
return nil
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
if r.Intn(100) <= i.percentage {
if err, active := i.errorInjection.checkActive(); active {
return err
}
if delay, active := i.delayInjection.checkActive(); active {
time.Sleep(time.Millisecond * time.Duration(delay))
}
}
return nil
}