Skip to content
Permalink
Browse files

address comments from harmony-ek

  • Loading branch information...
chaosma committed Dec 30, 2018
1 parent 30fef7b commit 90b17a423a9a35779b60574b48deb317057b1d3d
Showing with 67 additions and 71 deletions.
  1. +4 −2 cmd/broadcast/README.md
  2. +28 −40 cmd/broadcast/main.go
  3. +31 −26 cmd/broadcast/utils.go
  4. +4 −3 internal/ida/coopcast/interface.go
@@ -1,7 +1,7 @@
## Benchmark test for coopcast

#### What is Coopcast
Coopcast is a fast, scalable and fault resilient network protocol to deliver messages to a group of nodes in the network.
Coopcast (cooperative multicast) is a fast, scalable and fault resilient network protocol to deliver messages to a group of nodes in the network.
The sender uses RaptorQ encoder (RFC6330) to encode the message and broadcasts small pieces of the encoded message (called Symbol) to its neighbor peers. Each peer who receives the Symbol will also responsible to relay it to its own neighbor peers, that's why we call it Coop(erate) Cast.

##### Fast
@@ -28,9 +28,11 @@ It depends on the following library:
###### Build go executable
./build.sh

###### Generate network topology (generate a network of 5 nodes, fully connected)
###### Generate network topology
./generate_configs.sh graph1.txt

It generates a network of 5 nodes, fully connected. The first line of graph1.txt is the number of nodes in the network. The rest lines describe the neighborhood of a given node. For example, if a line is 0 1 2 3, it means the node 0 will have 3 outgoing/neighbor peers which are node 1, node 2 and node 3

###### Start 4 server nodes (0,1,2,3) waiting for receiving messages
./start_server 5 [coopcast|manycast]

@@ -8,64 +8,67 @@ import (
"log"
"math/rand"
"net"
"strconv"
"time"
)

// InitCoopCastNode creates a coopcast node
func InitCoopCastNode(confignbr string, configallpeer string, t0 float64, t1 float64, t2 float64, base float64, hop int) *coopcast.Node {
func initCoopCastNode(confignbr string, configallpeer string, t0 float64, t1 float64, t2 float64, base float64, hop int) *coopcast.Node {
rand.Seed(time.Now().UTC().UnixNano())
config1 := NewConfig()
err := config1.ReadConfigFile(confignbr)
if err != nil {
log.Printf("unable to read config file %v", confignbr)
return nil
}
selfPeer, peerList, _ := config1.GetPeerInfo()
config2 := NewConfig()
err = config2.ReadConfigFile(configallpeer)
if err != nil {
log.Printf("unable to read config file %v", configallpeer)
return nil
}

_, _, allPeers := config2.GetPeerInfo()
Cache := make(map[coopcast.HashKey]*coopcast.RaptorQImpl)
SenderCache := make(map[coopcast.HashKey]bool)
PeerDecodedCounter := make(map[coopcast.HashKey]map[int]int)
node := coopcast.Node{SelfPeer: selfPeer, PeerList: peerList, AllPeers: allPeers, Cache: Cache, PeerDecodedCounter: PeerDecodedCounter, SenderCache: SenderCache, InitialDelayTime: t0, MaxDelayTime: t1, ExpBase: base, RelayTime: t2, Hop: hop}
cache := make(map[coopcast.HashKey]*coopcast.RaptorQImpl)
senderCache := make(map[coopcast.HashKey]bool)
peerDecodedCounter := make(map[coopcast.HashKey]map[int]int)
node := coopcast.Node{SelfPeer: selfPeer, PeerList: peerList, AllPeers: allPeers, Cache: cache, PeerDecodedCounter: peerDecodedCounter, SenderCache: senderCache, InitialDelayTime: t0, MaxDelayTime: t1, ExpBase: base, RelayTime: t2, Hop: hop}
return &node
}

// InitManyCastNode creates a manycast node
func InitManyCastNode(confignbr string, configallpeer string) *manycast.Node {
func initManyCastNode(confignbr string, configallpeer string) *manycast.Node {
config1 := NewConfig()
err := config1.ReadConfigFile(confignbr)
if err != nil {
log.Printf("unable to read config file %v", confignbr)
return nil
}
selfPeer, peerList, _ := config1.GetPeerInfo()
config2 := NewConfig()
err = config2.ReadConfigFile(configallpeer)
if err != nil {
log.Printf("unable to read config file %v", configallpeer)
return nil
}
_, _, allPeers := config2.GetPeerInfo()
node := manycast.Node{SelfPeer: selfPeer, PeerList: peerList, AllPeers: allPeers}
return &node
}

func main() {
rand.Seed(time.Now().UnixNano())

graphConfigFile := flag.String("graph_config", "graph0.txt", "file containing network structure")
generateConfigFiles := flag.Bool("gen_config", false, "whether to generate config files from graph_config file")
broadCast := flag.Bool("broadcast", false, "whether to broadcast a message")
msgFile := flag.String("msg_file", "test.txt", "message file to broadcast")
configFile := flag.String("nbr_config", "configs/config_0.txt", "config file contains neighbor peers")
allPeerFile := flag.String("all_config", "configs/config_allpeers.txt", "config file contains all peer nodes info")
mode := flag.String("mode", "coopcast", "choose broadcast testing mode, [coopcast|manycast]")
t0 := flag.String("t0", "5", "initial delay time for symbol broadcasting")
t1 := flag.String("t1", "50", "uppper bound delay time for symbol broadcasting")
t2 := flag.String("t2", "7", "delay time for symbol relay")
hop := flag.String("hop", "1", "number of hops")
base := flag.String("base", "1.05", "base of exponential increase of symbol broadcasting delay")
t0 := flag.Float64("t0", 5, "initial delay time for symbol broadcasting")
t1 := flag.Float64("t1", 50, "uppper bound delay time for symbol broadcasting")
t2 := flag.Float64("t2", 7, "delay time for symbol relay")
hop := flag.Int("hop", 1, "number of hops")
base := flag.Float64("base", 1.05, "base of exponential increase of symbol broadcasting delay")
flag.Parse()

if *generateConfigFiles {
@@ -75,60 +78,45 @@ func main() {

switch *mode {
case "coopcast":
var ta, tb, tc, b float64
var h int
var err error
if ta, err = strconv.ParseFloat(*t0, 64); err != nil {
log.Printf("unable to parse t0 %v with error %v", t0, err)
return
}
if tb, err = strconv.ParseFloat(*t1, 64); err != nil {
log.Printf("unable to parse t1 %v with error %v", t1, err)
return
}
if tc, err = strconv.ParseFloat(*t2, 64); err != nil {
log.Printf("unable to parse t2 %v with error %v", t2, err)
return
}
if b, err = strconv.ParseFloat(*base, 64); err != nil {
log.Printf("unable to parse base %v with error %v", base, err)
node := initCoopCastNode(*configFile, *allPeerFile, *t0, *t1, *t2, *base, *hop)
if node == nil {
log.Printf("unable to create node")
return
}
if h, err = strconv.Atoi(*hop); err != nil {
log.Printf("unable to parse hop %v with error %v", hop, err)
return
}
node := InitCoopCastNode(*configFile, *allPeerFile, ta, tb, tc, b, h)
uaddr := net.JoinHostPort("", node.SelfPeer.UDPPort)
pc, err := net.ListenPacket("udp", uaddr)
if err != nil {
log.Printf("cannot connect to udp port")
log.Printf("cannot listen on udp port")
return
}
log.Printf("server start listening on udp port %s", node.SelfPeer.UDPPort)

if *broadCast {
go node.ListeningOnBroadCast(pc)
filecontent, err := ioutil.ReadFile(*msgFile)
log.Printf("file size is %v", len(filecontent))
if err != nil {
log.Printf("cannot open file %s", *msgFile)
return
}
log.Printf("file size is %v", len(filecontent))
cancels, raptorq := node.BroadCast(filecontent, pc)
node.StopBroadCast(cancels, raptorq)
} else {
node.ListeningOnBroadCast(pc)
}
case "manycast":
node := InitManyCastNode(*configFile, *allPeerFile)
node := initManyCastNode(*configFile, *allPeerFile)
if node == nil {
log.Printf("unable to create node")
return
}
if *broadCast {
filecontent, err := ioutil.ReadFile(*msgFile)
log.Printf("file size is %v", len(filecontent))
if err != nil {
log.Printf("cannot open file %s", *msgFile)
return
}
log.Printf("file size is %v", len(filecontent))
node.BroadCast(filecontent)
} else {
node.ListeningOnUniCast()
@@ -10,24 +10,30 @@ import (
"os"
"strconv"
"strings"
"time"
)

const pubKeySize int = 20
type Role int

// Entry is a single config of a node.
type Entry struct {
Sid string
const (
Self Role = 0
Neighbor Role = 1
All Role = 2
pubKeySize int = 20
)

// PeerConfig is a single config of a node.
type PeerConfig struct {
Sid string // SimpleID, might be replaced later for more generic ID like byte array
IP string
TCPPort string
UDPPort string
PubKey string
Role string
}

// Config is a struct containing multiple Entry of all nodes.
// Config is a struct containing network topolgy, i.e. multiple PeerConfig of all nodes.
type Config struct {
config []Entry
config []PeerConfig
}

// NewConfig returns a pointer to a Config.
@@ -37,19 +43,16 @@ func NewConfig() *Config {
}

// GetPeerInfo returns the selfPeer, peerList, allPeers from config instance, which used to create node instance
func (config *Config) GetPeerInfo() (ida.Peer, []ida.Peer, []ida.Peer) {
var allPeers []ida.Peer
var peerList []ida.Peer
var selfPeer ida.Peer
func (config *Config) GetPeerInfo() (selfPeer ida.Peer, peerList []ida.Peer, allPeers []ida.Peer) {
for _, entry := range config.config {
sid, err := strconv.Atoi(entry.Sid)
if err != nil {
log.Printf("cannot convert sid")
}
peer := ida.Peer{IP: entry.IP, TCPPort: entry.TCPPort, UDPPort: entry.UDPPort, PubKey: entry.PubKey, Sid: sid}
if entry.Role == "0" {
if entry.Role == "self" {
selfPeer = peer
} else if entry.Role == "1" {
} else if entry.Role == "neighbor" {
peerList = append(peerList, peer)
allPeers = append(allPeers, peer)
} else {
@@ -59,7 +62,7 @@ func (config *Config) GetPeerInfo() (ida.Peer, []ida.Peer, []ida.Peer) {
return selfPeer, peerList, allPeers
}

// ReadConfigFile parses the config file and return a 2d array containing the file data
// ReadConfigFile parses the config file and return an error
func (config *Config) ReadConfigFile(filename string) error {
file, err := os.Open(filename)
if err != nil {
@@ -69,10 +72,14 @@ func (config *Config) ReadConfigFile(filename string) error {
defer file.Close()
fscanner := bufio.NewScanner(file)

result := []Entry{}
result := []PeerConfig{}
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
entry := Entry{p[0], p[1], p[2], p[3], p[4], p[5]}
if len(p) != 6 {
log.Printf("incorrect format, need 6 columns, but actually have %v columns", len(p))
return nil
}
entry := PeerConfig{p[0], p[1], p[2], p[3], p[4], p[5]}
result = append(result, entry)
}
config.config = result
@@ -111,7 +118,6 @@ func initConfig(n int) (map[int][]byte, []int, []int) {
}
defer f.Close()

rand.Seed(time.Now().UnixNano())
udpport := 10000
tcpport := 20000
udps := make([]int, n)
@@ -129,7 +135,7 @@ func initConfig(n int) (map[int][]byte, []int, []int) {
log.Printf("unable to create random number")
}
pubkey := hex.EncodeToString(buf)
line = line + pubkey + " 2\n"
line = line + pubkey + " all\n"
tcps[i] = tcpport
udps[i] = udpport
pubkeys[i] = buf
@@ -141,23 +147,22 @@ func initConfig(n int) (map[int][]byte, []int, []int) {
}

func writeGraphRelationToConfig(p []string, n int, pubkeys map[int][]byte, tcps []int, udps []int) {
idx, err := strconv.Atoi(p[0])
if err != nil {
log.Printf("cannot convert index %v", p[0])
return
}
filename := "configs/config_" + p[0] + ".txt"
f, err := os.Create(filename)
if err != nil {
log.Printf("cannot create file %v", filename)
return
}
defer f.Close()
var idx int
idx, err = strconv.Atoi(p[0])
if err != nil {
log.Printf("cannot convert index %v", p[0])
return
}
ts := strconv.Itoa(tcps[idx])
us := strconv.Itoa(udps[idx])
sid := strconv.Itoa(idx)
line := sid + " 127.0.0.1 " + ts + " " + us + " " + hex.EncodeToString(pubkeys[idx]) + " 0\n"
line := sid + " 127.0.0.1 " + ts + " " + us + " " + hex.EncodeToString(pubkeys[idx]) + " self\n"
io.WriteString(f, line)
for _, v := range p[1:] {
idx, err = strconv.Atoi(v)
@@ -167,7 +172,7 @@ func writeGraphRelationToConfig(p []string, n int, pubkeys map[int][]byte, tcps
ts := strconv.Itoa(tcps[idx])
us := strconv.Itoa(udps[idx])
sid := strconv.Itoa(idx)
line := sid + " 127.0.0.1 " + ts + " " + us + " " + hex.EncodeToString(pubkeys[idx]) + " 1\n"
line := sid + " 127.0.0.1 " + ts + " " + us + " " + hex.EncodeToString(pubkeys[idx]) + " neighbor\n"
io.WriteString(f, line)
}
}
@@ -2,6 +2,7 @@ package coopcast

import (
"context"
"crypto/sha1"
libraptorq "github.com/harmony-one/go-raptorq/pkg/raptorq"
"net"
"sync"
@@ -15,10 +16,10 @@ const (
cacheClearInterval time.Duration = 250 // clear cache every xx seconds
enforceClearInterval int64 = 300 // clear old cache eventually
udpCacheSize int = 2 * 1024
normalChunkSize int = 100 * 1200
symbolSize int = 1200 // must be multiple of Al(=4) required by RFC6330
normalChunkSize int = 100 * symbolSize

hashSize int = 20 // sha1 hash size
hashSize int = sha1.Size
threshold float32 = 0.8 // threshold rate of number of neighors decode message successfully
)

@@ -50,7 +51,7 @@ type Node struct {
Cache map[HashKey]*RaptorQImpl
PeerDecodedCounter map[HashKey]map[int]int

mux sync.Mutex
mux sync.Mutex // mutex protect the concurrent write to the map in node, but not protect the fields in RaptorQimpl
}

// RaptorQImpl represents raptorQ structure holding necessary information for encoding and decoding message

0 comments on commit 90b17a4

Please sign in to comment.
You can’t perform that action at this time.