Skip to content

Commit

Permalink
global config + transport codec
Browse files Browse the repository at this point in the history
  • Loading branch information
ailidani committed Feb 16, 2018
1 parent c43c6a9 commit 99f28de
Show file tree
Hide file tree
Showing 41 changed files with 614 additions and 680 deletions.
13 changes: 11 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
language: go

sudo: false

go:
- 1.x
install: go get github.com/ailidani/paxi
script: cd bin/; ./build.sh

install:
- go get ./...

script:
- go install ./server/
- go install ./client/
- go install ./cmd/

notifications:
email: false
16 changes: 9 additions & 7 deletions atomic/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

type state int

// states of each instance
const (
GetPhase state = iota
SetPhase
Expand All @@ -32,9 +33,10 @@ type Replica struct {
version map[paxi.Key]int
}

func NewReplica(config paxi.Config) *Replica {
// NewReplica generates ABD replica
func NewReplica(id paxi.ID) *Replica {
r := new(Replica)
r.Node = paxi.NewNode(config)
r.Node = paxi.NewNode(id)
r.log = make(map[int]*entry)
r.version = make(map[paxi.Key]int)
r.Register(paxi.Request{}, r.handleRequest)
Expand All @@ -61,7 +63,7 @@ func (r *Replica) handleRequest(m paxi.Request) {
version: version,
}
r.log[r.cid].getQuorum.ACK(r.ID())
r.Broadcast(&Get{
r.Broadcast(Get{
ID: r.ID(),
CID: r.cid,
Key: k,
Expand All @@ -70,7 +72,7 @@ func (r *Replica) handleRequest(m paxi.Request) {

func (r *Replica) handleGet(m Get) {
v := r.Node.Get(m.Key)
r.Send(m.ID, &GetReply{
r.Send(m.ID, GetReply{
ID: r.ID(),
CID: m.CID,
Key: m.Key,
Expand All @@ -85,7 +87,7 @@ func (r *Replica) handleSet(m Set) {
r.Node.Put(m.Key, m.Value)
r.version[m.Key] = m.Version
}
r.Send(m.ID, &SetReply{
r.Send(m.ID, SetReply{
ID: r.ID(),
CID: m.CID,
Key: m.Key,
Expand All @@ -109,7 +111,7 @@ func (r *Replica) handleGetReply(m GetReply) {
e.state = SetPhase // into set phase
e.setQuorum.ACK(r.ID())
if e.r.Command.IsRead() {
r.Broadcast(&Set{
r.Broadcast(Set{
ID: r.ID(),
CID: m.CID,
Key: m.Key,
Expand All @@ -122,7 +124,7 @@ func (r *Replica) handleGetReply(m GetReply) {
// write new value to local database first
r.Node.Put(e.r.Command.Key, e.r.Command.Value)
r.version[m.Key] = e.version
r.Broadcast(&Set{
r.Broadcast(Set{
ID: r.ID(),
CID: m.CID,
Key: e.r.Command.Key,
Expand Down
28 changes: 16 additions & 12 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ailidani/paxi/log"
)

// DB is general interface implemented by client to call client library
type DB interface {
Init()
Read(key int) int
Expand Down Expand Up @@ -41,13 +42,14 @@ type bconfig struct {
Speed int // moving speed in milliseconds intervals per key

// zipfian distribution
Zipfian_s float64 // zipfian s parameter
Zipfian_v float64 // zipfian v parameter
ZipfianS float64 // zipfian s parameter
ZipfianV float64 // zipfian v parameter

Throttle int // requests per second throttle
}

func NewBenchmarkConfig() bconfig {
// NewBenchmarkConfig returns a default benchmark config
func newBenchmarkConfig() bconfig {
return bconfig{
T: 10,
N: 0,
Expand All @@ -62,8 +64,8 @@ func NewBenchmarkConfig() bconfig {
Sigma: 60,
Move: false,
Speed: 500,
Zipfian_s: 2,
Zipfian_v: 1,
ZipfianS: 2,
ZipfianV: 1,
}
}

Expand All @@ -87,6 +89,7 @@ func (c *bconfig) Save() error {
return encoder.Encode(c)
}

// Benchmark is benchmarking tool that generates workload and collects operation history and latency
type Benchmark struct {
db DB // read/write operation interface
bconfig
Expand All @@ -98,10 +101,11 @@ type Benchmark struct {
zipf *rand.Zipf
}

// NewBenchmark returns new Benchmark object given implementation of DB interface
func NewBenchmark(db DB) *Benchmark {
b := new(Benchmark)
b.db = db
b.bconfig = NewBenchmarkConfig()
b.bconfig = newBenchmarkConfig()
b.History = NewHistory()
return b
}
Expand All @@ -110,7 +114,7 @@ func NewBenchmark(db DB) *Benchmark {
func (b *Benchmark) Run() {
rand.Seed(time.Now().UTC().UnixNano())
r := rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
b.zipf = rand.NewZipf(r, b.Zipfian_s, b.Zipfian_v, uint64(b.K))
b.zipf = rand.NewZipf(r, b.ZipfianS, b.ZipfianV, uint64(b.K))

var stop chan bool
if b.Move {
Expand Down Expand Up @@ -154,17 +158,17 @@ func (b *Benchmark) Run() {

log.Infof("Benchmark took %v\n", t)
log.Infof("Throughput %f\n", float64(len(b.latency))/t.Seconds())
log.Infoln(stat)
log.Info(stat)

stat.WriteFile("latency" + "." + string(GetID()))
b.History.WriteFile("history" + "." + string(GetID()))
stat.WriteFile("latency")
b.History.WriteFile("history")

if b.LinearizabilityCheck {
n := b.History.Linearizable()
if n == 0 {
log.Infoln("The execution is linearizable.")
log.Info("The execution is linearizable.")
} else {
log.Infoln("The execution is NOT linearizable.")
log.Info("The execution is NOT linearizable.")
log.Infof("Total anomaly read operations are %d", n)
log.Infof("Anomaly percentage is %f", float64(n)/float64(stat.Size))
}
Expand Down
4 changes: 2 additions & 2 deletions bin/build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env bash
go build ../master/
#go build ../master/
go build ../server/
go build ../client/
go build ../cmd/
go build ../cmd/
2 changes: 1 addition & 1 deletion bin/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"thrifty": false,
"chan_buffer_size": 1024,
"buffer_size": 1024,
"transport": "udp",
"transport": "tcp",
"codec": "gob",
"reply_when_commit": false
}
4 changes: 2 additions & 2 deletions bin/simulation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ if [ -z "${PID}" ]; then
go build ../server/
go build ../client/
go build ../cmd/
./server -log_dir=logs -log_level=debug -simulation &
./server -log_dir=logs -log_level=debug &
echo $! >> ${PID_FILE}
sleep 3
./client > c1 &
./client -log_level=debug > c1 &
echo $! >> ${PID_FILE}
# ./client -id 2.1 > c2 &
# echo $! >> ${PID_FILE}
Expand Down
1 change: 0 additions & 1 deletion bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ PID=$(cat "${PID_FILE}");

if [ -z "${PID}" ]; then
echo "Process id for servers is written to location: {$PID_FILE}"
go build ../master/
go build ../server/
go build ../client/
go build ../cmd/
Expand Down
69 changes: 69 additions & 0 deletions blockchain/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package blockchain

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/gob"
"math"

"github.com/ailidani/paxi/log"
)

// PREFIX is the leading zeros in hash value as proof of work
var PREFIX = []byte("0000")

func init() {
gob.Register(Block{})
}

// Block contains some amount of data in blockchain
type Block struct {
Index uint64
Nonce uint64
Data []byte
Prev []byte // previous block hash
Hash []byte // current block hash

next *Block
}

// Next generates the next block given some data
func (b *Block) Next(data []byte) *Block {
next := &Block{
Index: b.Index + 1,
Data: data,
Prev: b.Hash,
}
b.next = next
h := sha256.New()
h.Write(next.bytes())
for i := uint64(0); i <= math.MaxUint64; i++ {
t := h
err := binary.Write(t, binary.LittleEndian, i)
if err != nil {
log.Error("binary write failed: ", err)
return nil
}
thash := t.Sum(nil)
if bytes.HasPrefix(thash, PREFIX) {
next.Nonce = i
next.Hash = thash
log.Debugf("Nonce found %d", i)
return next
}
}
return nil
}

func (b *Block) bytes() []byte {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, b.Index)
if err != nil {
log.Error("binary write failed: ", err)
return nil
}
buf.Write(b.Data)
buf.Write(b.Prev)
return buf.Bytes()
}
50 changes: 50 additions & 0 deletions blockchain/miner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package blockchain

import (
"bytes"
"encoding/gob"

"github.com/ailidani/paxi"
"github.com/ailidani/paxi/log"
)

// Miner is the maintainer of blockchain
type Miner struct {
paxi.Node

blockchain map[uint64]*Block
chain *Block
requests map[uint64]*paxi.Request
index uint64
}

// NewMiner creates new Miner as paxi node
func NewMiner(id paxi.ID) *Miner {
miner := &Miner{
Node: paxi.NewNode(id),
blockchain: make(map[uint64]*Block),
}
miner.Node.Register(paxi.Request{}, miner.handleRequest)
miner.Node.Register(paxi.Request{}, miner.handleBlock)
return miner
}

func (m *Miner) handleRequest(r paxi.Request) {
buf := new(bytes.Buffer)
encoder := gob.NewEncoder(buf)
err := encoder.Encode(r.Command)
if err != nil {
log.Error("gob encode error: ", err)
return
}
block := m.blockchain[m.index].Next(buf.Bytes())
m.blockchain[block.Index] = block
m.requests[block.Index] = &r
m.index = block.Index
r.Reply(paxi.Reply{
Command: r.Command,
})
}

func (m *Miner) handleBlock(b Block) {
}
4 changes: 2 additions & 2 deletions checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (a operation) equal(b operation) bool {
return a.input == b.input && a.output == b.output && a.start == b.start && a.end == b.end
}

func (o operation) String() string {
return fmt.Sprintf("{input=%v, output=%v, start=%d, end=%d}", o.input, o.output, o.start, o.end)
func (a operation) String() string {
return fmt.Sprintf("{input=%v, output=%v, start=%d, end=%d}", a.input, a.output, a.start, a.end)
}

type checker struct {
Expand Down
21 changes: 11 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ type Client struct {
}

// NewClient creates a new Client from config
func NewClient(config Config) *Client {
func NewClient(id ID) *Client {
return &Client{
ID: config.ID,
N: len(config.Addrs),
addrs: config.Addrs,
http: config.HTTPAddrs,
algorithm: config.Algorithm,
ID: id,
N: len(Config.Addrs),
addrs: Config.Addrs,
http: Config.HTTPAddrs,
algorithm: Config.Algorithm,
}
}

Expand All @@ -50,12 +50,13 @@ func (c *Client) rest(id ID, key Key, value Value) Value {
}
r, err := http.NewRequest(method, url, body)
if err != nil {
// TODO should return error for the operation
log.Error(err)
return nil
}
r.Header.Set(HttpClientID, string(c.ID))
r.Header.Set(HttpCommandID, strconv.Itoa(c.cid))
r.Header.Set(HttpTimestamp, strconv.FormatInt(time.Now().UnixNano(), 10))
r.Header.Set(HTTPClientID, string(c.ID))
r.Header.Set(HTTPCommandID, strconv.Itoa(c.cid))
r.Header.Set(HTTPTimestamp, strconv.FormatInt(time.Now().UnixNano(), 10))
res, err := http.DefaultClient.Do(r)
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -121,7 +122,7 @@ func (c *Client) json(id ID, key Key, value Value) Value {
data, err := json.Marshal(cmd)
res, err := http.Post(url, "json", bytes.NewBuffer(data))
if err != nil {
log.Errorln(err)
log.Error(err)
return nil
}
defer res.Body.Close()
Expand Down
Loading

0 comments on commit 99f28de

Please sign in to comment.