Skip to content

Commit

Permalink
优化tryReadNewValues逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
smthing committed Jun 5, 2024
1 parent 26bdeb9 commit 52706c3
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 17 deletions.
76 changes: 60 additions & 16 deletions driverbox/helper/shadow/shadow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var UnknownDeviceErr = errors.New("unknown device")
var DeviceRepeatErr = errors.New("device already exists")
var BindPointDataErr = errors.New("bind online point data can't be parsed")
var UnknownDevicePointErr = errors.New("unknown device point")
var UnknownDeviceLockErr = errors.New("unknown device lock")

type OnlineChangeCallback func(deviceId string, online bool) // 设备上/下线回调

Expand Down Expand Up @@ -50,12 +51,15 @@ type deviceShadow struct {
m *sync.Map
ticker *time.Ticker
handlerFunc OnlineChangeCallback
//设备同步锁
deviceLock *sync.Map
}

func NewDeviceShadow() DeviceShadow {
shadow := &deviceShadow{
m: &sync.Map{},
ticker: time.NewTicker(time.Second),
m: &sync.Map{},
ticker: time.NewTicker(time.Second),
deviceLock: &sync.Map{},
}
go shadow.checkOnOff()
return shadow
Expand All @@ -66,6 +70,7 @@ func (d *deviceShadow) AddDevice(device Device) (err error) {
return DeviceRepeatErr
}
device.updatedAt = time.Now()
d.deviceLock.Store(device.id, &sync.Mutex{})
d.m.Store(device.id, device)
return nil
}
Expand All @@ -84,6 +89,16 @@ func (d *deviceShadow) HasDevice(deviceId string) bool {
}

func (d *deviceShadow) SetDevicePoint(deviceId, pointName string, value interface{}) (err error) {
deviceLock, ok := d.deviceLock.Load(deviceId)
if !ok {
return UnknownDeviceLockErr
}
lock := deviceLock.(*sync.Mutex)
lock.Lock()
defer func() {
lock.Unlock()
}()

deviceAny, ok := d.m.Load(deviceId)
if !ok {
return UnknownDeviceErr
Expand All @@ -95,11 +110,23 @@ func (d *deviceShadow) SetDevicePoint(deviceId, pointName string, value interfac
// update point value
device.updatedAt = time.Now()
device.disconnectTimes = 0
device.points.Store(pointName, DevicePoint{
Name: pointName,
Value: value,
UpdatedAt: time.Now(),
})

cachePoint, ok := device.points.Load(pointName)
if ok {
device.points.Store(pointName, DevicePoint{
Name: pointName,
Value: value,
UpdatedAt: time.Now(),
LatestWriteTime: cachePoint.(DevicePoint).LatestWriteTime,
})
} else {
device.points.Store(pointName, DevicePoint{
Name: pointName,
Value: value,
UpdatedAt: time.Now(),
})
}

// update device online status
if device.onlineBindPoint == pointName { // bind point
if online, err := parseOnlineBindPV(value); err == nil {
Expand Down Expand Up @@ -156,21 +183,38 @@ func (d *deviceShadow) GetDevicePoints(deviceId string) (points map[string]Devic
}

func (d *deviceShadow) UpdateDevicePointWriteTime(deviceId, pointName string) (err error) {
if deviceAny, ok := d.m.Load(deviceId); ok {
device := deviceAny.(Device)
if device.points == nil {
device.points = &sync.Map{}
}
deviceLock, ok := d.deviceLock.Load(deviceId)
if !ok {
return UnknownDeviceLockErr
}
lock := deviceLock.(*sync.Mutex)
lock.Lock()
defer func() {
lock.Unlock()
}()
deviceAny, ok := d.m.Load(deviceId)
if !ok {
return UnknownDeviceErr
}
device, _ := deviceAny.(Device)
if device.points == nil {
device.points = &sync.Map{}
}

p, ok := device.points.Load(pointName)
if ok {
point := p.(DevicePoint)
point.LatestWriteTime = time.Now()
device.points.Store(pointName, p)
} else {
device.points.Store(pointName, DevicePoint{
Name: pointName,
Value: nil,
LatestWriteTime: time.Now(),
})
d.m.Store(deviceId, device)
return nil
} else {
return UnknownDeviceErr
}
d.m.Store(deviceId, device)
return nil
}
func (d *deviceShadow) GetDeviceUpdateAt(deviceId string) (time.Time, error) {
if deviceAny, ok := d.m.Load(deviceId); ok {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/batch_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func tryReadNewValues(deviceId string, points []plugin.PointData) {
//设备影子不存在,尝试读取
newReadPoints = append(newReadPoints, p)
} else if point.LatestWriteTime.After(checkTime) {
//在checkTime之后有发生过写行为,则本地检验可能不会生效
//在checkTime之后有发生过写行为,则本次检验可能不会生效
helper.Logger.Warn("point write success, but expect point value maybe expired", zap.String("deviceId", deviceId), zap.String("point", p.PointName), zap.Any("expect", p.Value), zap.Any("value", point.Value))
} else if fmt.Sprint(p.Value) == fmt.Sprint(point.Value) {
helper.Logger.Info("point write success, read new value", zap.String("deviceId", deviceId), zap.String("point", p.PointName), zap.Any("expect", p.Value), zap.Any("value", point.Value))
Expand Down

0 comments on commit 52706c3

Please sign in to comment.