Skip to content

Commit

Permalink
Refactor Code For Subcribe And UnSubcribe
Browse files Browse the repository at this point in the history
  • Loading branch information
0xmerman committed Jun 18, 2019
1 parent a60a455 commit 005b785
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 80 deletions.
13 changes: 13 additions & 0 deletions common/hash.go
Expand Up @@ -216,3 +216,16 @@ func ConvertArrayStringToArrayHash(strs []string) ([]*Hash, error) {
}
return hashes, nil
}

func HashArrayInterface(target interface{}) (Hash, error) {
arr := InterfaceSlice(target)
temp := []byte{0}
for value := range arr {
valueBytes, err := json.Marshal(&value)
if err != nil {
return Hash{}, err
}
temp = append(temp, valueBytes...)
}
return HashH(temp), nil
}
Binary file modified config
Binary file not shown.
16 changes: 7 additions & 9 deletions database/lvdb/epochreward.go
@@ -1,8 +1,6 @@
package lvdb

import (
"fmt"

"github.com/incognitochain/incognito-chain/common"
)

Expand All @@ -11,37 +9,37 @@ func (db *db) AddShardRewardRequest(
shardID byte,
rewardAmount uint64,
) error {
fmt.Printf("[ndh]-[DATABASE] AddShardRewardRequest- - - %+v %+v %+v\n", epoch, shardID, rewardAmount)
//fmt.Printf("[ndh]-[DATABASE] AddShardRewardRequest- - - %+v %+v %+v\n", epoch, shardID, rewardAmount)
key, err := NewKeyAddShardRewardRequest(epoch, shardID)
if err != nil {
return err
}
oldValue, err := db.Get(key)
if err != nil {
fmt.Printf("[ndh]-[ERROR] AddShardRewardRequest 0- - - %+v\n", err)
//fmt.Printf("[ndh]-[ERROR] AddShardRewardRequest 0- - - %+v\n", err)
err1 := db.Put(key, common.Uint64ToBytes(rewardAmount))
fmt.Printf("[ndh]-[ERROR] AddShardRewardRequest 1- - - %+v\n", err1)
//fmt.Printf("[ndh]-[ERROR] AddShardRewardRequest 1- - - %+v\n", err1)
if err1 != nil {
return err1
}
} else {
newValue := common.BytesToUint64(oldValue)
newValue += rewardAmount
err = db.Put(key, common.Uint64ToBytes(newValue))
fmt.Printf("[ndh]-[ERROR] AddShardRewardRequest 2- - - %+v\n", err)
//fmt.Printf("[ndh]-[ERROR] AddShardRewardRequest 2- - - %+v\n", err)
}
return nil
}

func (db *db) GetRewardOfShardByEpoch(epoch uint64, shardID byte) (uint64, error) {
fmt.Printf("[ndh]-[DATABASE] GetRewardOfShardByEpoch- - - %+v %+v\n", epoch, shardID)
//fmt.Printf("[ndh]-[DATABASE] GetRewardOfShardByEpoch- - - %+v %+v\n", epoch, shardID)
key, _ := NewKeyAddShardRewardRequest(epoch, shardID)
rewardAmount, err := db.Get(key)
if err != nil {
fmt.Printf("[ndh]-[ERROR] 1 --- %+v\n", err)
//fmt.Printf("[ndh]-[ERROR] 1 --- %+v\n", err)
return 0, nil
}
fmt.Printf("[ndh] - - - %+v\n", rewardAmount)
//fmt.Printf("[ndh] - - - %+v\n", rewardAmount)
return common.BytesToUint64(rewardAmount), nil
}

Expand Down
5 changes: 4 additions & 1 deletion rpcserver/request.go
Expand Up @@ -45,10 +45,13 @@ func parseJsonRequest(rawMessage []byte) (*JsonRequest, error) {
return &request, nil
}
}

//type for subcribe and unsubcribe
// 0: subcribe
// 1: unsubcribe
type SubcriptionRequest struct {
JsonRequest JsonRequest `json:"Request"`
Subcription string `json:"Subcription"`
Type int `json:"Type"`
}

func parseSubcriptionRequest(rawMessage []byte) (*SubcriptionRequest, error) {
Expand Down
195 changes: 125 additions & 70 deletions rpcserver/websocket.go
Expand Up @@ -3,6 +3,7 @@ package rpcserver
import (
"errors"
"github.com/gorilla/websocket"
"github.com/incognitochain/incognito-chain/common"
"net"
"net/http"
"sync"
Expand All @@ -26,7 +27,13 @@ type RpcSubResult struct {
Result interface{}
Error *RPCError
}

// Manage All Subcription from one socket connection
type SubcriptionManager struct {
wsMtx sync.RWMutex
subMtx sync.RWMutex
subRequestList map[string]map[common.Hash]chan struct{} // String: Subcription Method, Hash: hash from Subcription Params
ws *websocket.Conn
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Expand All @@ -35,7 +42,12 @@ var upgrader = websocket.Upgrader{
func (wsServer *WsServer) Init(config *RpcServerConfig) {
wsServer.config = *config
}

func NewSubcriptionManager(ws *websocket.Conn) *SubcriptionManager{
return &SubcriptionManager{
subRequestList: make(map[string]map[common.Hash]chan struct{}),
ws: ws,
}
}
// Start is used by rpcserver.go to start the rpc listener.
func (wsServer *WsServer) Start() error {
if atomic.AddInt32(&wsServer.started, 1) != 1 {
Expand Down Expand Up @@ -100,13 +112,30 @@ func (wsServer *WsServer) handleWsRequest(w http.ResponseWriter, r *http.Request
}
wsServer.ProcessRpcWsRequest(ws)
}

func (wsServer *WsServer) limitWsConnections(w http.ResponseWriter, remoteAddr string) bool {
if int(atomic.LoadInt32(&wsServer.numWsClients)+1) > wsServer.config.RPCMaxWSClients {
Logger.log.Infof("Max RPC Web Socket exceeded [%d] - "+
"disconnecting client %s", wsServer.config.RPCMaxClients,
remoteAddr)
http.Error(w, "503 Too busy. Try again later.",
http.StatusServiceUnavailable)
return true
}
return false
}
func (wsServer *WsServer) IncrementWsClients() {
atomic.AddInt32(&wsServer.numWsClients, 1)
}
func (wsServer *WsServer) DecrementWsClients() {
atomic.AddInt32(&wsServer.numWsClients, -1)
}
func (wsServer *WsServer) ProcessRpcWsRequest(ws *websocket.Conn) {
if atomic.LoadInt32(&wsServer.shutdown) != 0 {
return
}
defer ws.Close()
var wsMtx sync.Mutex
// one sub manager will manage connection and subcription with one client (one websocket connection)
subManager := NewSubcriptionManager(ws)
for {
msgType, msg, err := ws.ReadMessage()
if err != nil {
Expand All @@ -118,80 +147,106 @@ func (wsServer *WsServer) ProcessRpcWsRequest(ws *websocket.Conn) {
continue
}
}
subcriptionRequest, jsonErr := parseSubcriptionRequest(msg)
subRequest, jsonErr := parseSubcriptionRequest(msg)
if jsonErr == nil {
go func(subcriptionRequest *SubcriptionRequest) {
var cResult chan RpcSubResult
var closeChan = make(chan struct{})
defer close(closeChan)
var jsonErr error
request := subcriptionRequest.JsonRequest
if request.Id == nil && !(wsServer.config.RPCQuirks && request.Jsonrpc == "") {
return
}
// Attempt to parse the JSON-RPC request into a known concrete command.
command := WsHandler[request.Method]
if command == nil {
jsonErr = NewRPCError(ErrRPCMethodNotFound, errors.New("Method"+request.Method+"Not found"))
Logger.log.Errorf("RPC from client %+v error %+v", ws.RemoteAddr(), jsonErr)
//Notify user, method not found
res, err := createMarshalledSubResponse(subcriptionRequest, nil, jsonErr)
if err != nil {
Logger.log.Errorf("Failed to marshal reply: %s", err.Error())
return
}
wsMtx.Lock()
if err := ws.WriteMessage(msgType, res); err != nil {
Logger.log.Errorf("Failed to write reply message: %+v", err)
wsMtx.Unlock()
return
}
wsMtx.Unlock()
return
} else {
cResult = make(chan RpcSubResult)
// Run RPC websocket method
go command(wsServer, request.Params, subcriptionRequest.Subcription, cResult, closeChan)
// when rpc method has result, it will deliver it to this channel
for subResult := range cResult {
result := subResult.Result
jsonErr := subResult.Error
res, err := createMarshalledSubResponse(subcriptionRequest, result, jsonErr)
if err != nil {
Logger.log.Errorf("Failed to marshal reply: %s", err.Error())
return
}
wsMtx.Lock()
if err := ws.WriteMessage(msgType, res); err != nil {
Logger.log.Errorf("Failed to write reply message: %+v", err)
wsMtx.Unlock()
return
}
wsMtx.Unlock()
}
return
}
}(subcriptionRequest)
if subRequest.Type == 0 {
go wsServer.Subcribe(subManager, subRequest, msgType)
}
if subRequest.Type == 1 {
//TODO: unsubcribe
}
} else {
Logger.log.Errorf("RPC function process with err \n %+v", jsonErr)
}
}
}

func (wsServer *WsServer) IncrementWsClients() {
atomic.AddInt32(&wsServer.numWsClients, 1)
func (wsServer *WsServer) Subcribe(subManager *SubcriptionManager, subRequest *SubcriptionRequest, msgType int) {
var cResult chan RpcSubResult
var closeChan = make(chan struct{})
defer func() {
close(closeChan)
}()
var jsonErr error
request := subRequest.JsonRequest
if request.Id == nil && !(wsServer.config.RPCQuirks && request.Jsonrpc == "") {
return
}
// Attempt to parse the JSON-RPC request into a known concrete command.
command := WsHandler[request.Method]
if command == nil {
jsonErr = NewRPCError(ErrRPCMethodNotFound, errors.New("Method"+request.Method+"Not found"))
Logger.log.Errorf("RPC from client %+v error %+v", subManager.ws.RemoteAddr(), jsonErr)
//Notify user, method not found
res, err := createMarshalledSubResponse(subRequest, nil, jsonErr)
if err != nil {
Logger.log.Errorf("Failed to marshal reply: %s", err.Error())
return
}
subManager.wsMtx.Lock()
if err := subManager.ws.WriteMessage(msgType, res); err != nil {
Logger.log.Errorf("Failed to write reply message: %+v", err)
subManager.wsMtx.Unlock()
return
}
subManager.wsMtx.Unlock()
return
} else {
cResult = make(chan RpcSubResult)
// push this subcription to subcription list
err := AddSubcription(subManager, subRequest, closeChan)
if err != nil {
Logger.log.Errorf("Json Params Hash Error %+v, Closing Websocket from Client %+v \n", err, subManager.ws.RemoteAddr())
close(cResult)
return
}
// Run RPC websocket method
go command(wsServer, request.Params, subRequest.Subcription, cResult, closeChan)
// when rpc method has result, it will deliver it to this channel
for subResult := range cResult {
result := subResult.Result
jsonErr := subResult.Error
res, err := createMarshalledSubResponse(subRequest, result, jsonErr)
if err != nil {
Logger.log.Errorf("Failed to marshal reply: %s", err.Error())
break
}
subManager.wsMtx.Lock()
if err := subManager.ws.WriteMessage(msgType, res); err != nil {
Logger.log.Errorf("Failed to write reply message: %+v", err)
subManager.wsMtx.Unlock()
break
}
subManager.wsMtx.Unlock()
}
return
}
}
func (wsServer *WsServer) DecrementWsClients() {
atomic.AddInt32(&wsServer.numWsClients, -1)
func RemoveSubcription(subManager *SubcriptionManager, subRequest *SubcriptionRequest) error {
subManager.subMtx.Lock()
defer subManager.subMtx.Unlock()
hash, err := common.HashArrayInterface(subRequest.JsonRequest.Params)
if err != nil {
return err
}
if paramsList, ok := subManager.subRequestList[subRequest.JsonRequest.Method]; ok {
if _, ok := paramsList[hash]; ok {
delete(paramsList, hash)
}
}
return nil
}
func (wsServer *WsServer) limitWsConnections(w http.ResponseWriter, remoteAddr string) bool {
if int(atomic.LoadInt32(&wsServer.numWsClients)+1) > wsServer.config.RPCMaxWSClients {
Logger.log.Infof("Max RPC Web Socket exceeded [%d] - "+
"disconnecting client %s", wsServer.config.RPCMaxClients,
remoteAddr)
http.Error(w, "503 Too busy. Try again later.",
http.StatusServiceUnavailable)
return true
func AddSubcription(subManager *SubcriptionManager, subRequest *SubcriptionRequest, closeChan chan struct{}) error {
subManager.subMtx.Lock()
defer subManager.subMtx.Unlock()
hash, err := common.HashArrayInterface(subRequest.JsonRequest.Params)
if err != nil {
return err
}
return false
if _, ok := subManager.subRequestList[subRequest.JsonRequest.Method]; !ok {
subManager.subRequestList[subRequest.JsonRequest.Method] = make(map[common.Hash]chan struct{})
}
subManager.subRequestList[subRequest.JsonRequest.Method][hash] = closeChan
return nil
}

0 comments on commit 005b785

Please sign in to comment.