Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic resharding record migration logic for new cluster (no GC yet) #21

Merged
merged 9 commits into from Jun 7, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
222 changes: 221 additions & 1 deletion dns/dns.go
@@ -1,8 +1,16 @@
package main

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"strings"
"sync"

"github.com/miekg/dns"
)
Expand Down Expand Up @@ -202,7 +210,6 @@ func HandleSingleQuestion(name string, qType uint16, r *dns.Msg, s *dnsStore) bo
}
}
// Don't worry about *.somedomain.com for NS records, they are not supported
return false
}

// Implicitly at port 53
Expand All @@ -214,3 +221,216 @@ func serveUDPAPI(store *dnsStore) {
w.WriteMsg(res)
})
}

type jsonNsInfo struct {
NsName string `json:"name"`
GlueRecord string `json:"glue"`
}
type jsonClusterInfo struct {
ClusterToken string `json:"cluster"`
Members []jsonNsInfo `json:"members"`
}

func getClusterInfo(hashServer string) []jsonClusterInfo {
// send HTTP request to hash server to retrieve cluster info
addr := "http://" + hashServer + ":9121/clusterinfo"
req, err := http.NewRequest("GET", addr, strings.NewReader(""))
if err != nil {
log.Fatal("Cannot form get cluster info request")
}
req.ContentLength = 0

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
if resp.StatusCode == http.StatusInternalServerError {
log.Fatal("Hash server internal server error")
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal("Failed to read cluster info response", err)
}

// fmt.Printf("Received: %s\n", body)
jsonClusters := make([]jsonClusterInfo, 0)
err = json.Unmarshal(body, &jsonClusters)
if err != nil {
log.Fatal(err)
}
// return cluster
return jsonClusters
}

func disableWrites(hashServer string) {
// send write disable request to hash server and
// wait for ack before return
log.Println("Hash Server write disabled")
}

func updateConfig(clusters []jsonClusterInfo, configPath *string) []jsonClusterInfo {
file, err := os.Open(*configPath)
if err != nil {
log.Fatal(err)
}
defer file.Close()

fileContent, err := ioutil.ReadAll(file)
if err != nil {
log.Fatal("Update config read failed: ", err)
}
var newCluster jsonClusterInfo
err = json.Unmarshal(fileContent, &newCluster)
if err != nil {
log.Fatal("Update config unmarshal:", err)
}
// fmt.Println(newCluster.ClusterToken)
return append(clusters, newCluster)
}

// print out []jsonClusterInfo
func PrintClusterConfig(clusters []jsonClusterInfo) {
for _, cluster := range clusters {
fmt.Println(cluster.ClusterToken)
for _, member := range cluster.Members {
fmt.Println("\t", member.NsName)
fmt.Println("\t", member.GlueRecord)
}
}
}

// send the updated cluster info to other Raft clusters
// the last entry in clusters is the new cluster
func sendClusterInfo(clusters []jsonClusterInfo, destIP string) {
// pick the first member ot send the cluster config to
addr := "http://" + destIP + ":9121/addcluster"
clusterJSON, err := json.Marshal(clusters)
log.Println("Sending cluster info to", addr)
if err != nil {
log.Fatal("sendClusterInfo: ", err)
}
req, err := http.NewRequest("PUT", addr, strings.NewReader(string(clusterJSON)))
if err != nil {
log.Fatal("sendClusterInfo: ", err)
}
req.ContentLength = int64(len(string(clusterJSON)))
resp, err := http.DefaultClient.Do(req)
log.Println("Received cluster info response", destIP)
if err != nil {
log.Fatal("sendClusterInfo: ", err)
}
if resp.StatusCode != http.StatusNoContent {
log.Fatal("sendClusterInfo request failed at server side")
}
}

func retrieveRR(clusters []jsonClusterInfo, store *dnsStore) {
var wg sync.WaitGroup
// excluding our own cluster
errorC := make(chan error)
for _, cluster := range clusters[:len(clusters)-1] {
member := cluster.Members[0]
t := strings.Split(member.GlueRecord, " ")
memberIP := t[len(t)-1]
addr := "http://" + memberIP + ":9121/getrecord"
wg.Add(1)
go func(addr string, store *dnsStore, errorC chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
log.Println("Start retrieving RR from", addr)
counter := 0
for {
req, err := http.NewRequest("GET", addr, strings.NewReader(strconv.Itoa(counter)))
if err != nil {
errorC <- err
return
}
req.ContentLength = int64(len(strconv.Itoa(counter)))
resp, err := http.DefaultClient.Do(req)
if err != nil {
errorC <- err
return
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
errorC <- err
return
}
// parse body
var records []string
err = json.Unmarshal(body, &records)
if err != nil {
errorC <- err
return
}
log.Println("Received", records)
if len(records) == 1 && records[0] == "Done" {
// finished reading
log.Println("Finished reading RRs")
return
}
// go through Raft to add RRs
for _, rrString := range records {
if !checkValidRRString(rrString) {
// log.Fatal("Received bad RR")
errorC <- errors.New("Recevied bad RR")
return
}
// this is async, might cause too much traffic
store.ProposeAddRR(rrString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a small comment to explain why locks are not needed here (due to proposals done through the channel)

log.Println("Adding RR:", rrString)
}
counter++
}
}(addr, store, errorC, &wg)
}

// check if a go thread failed
c := make(chan struct{})
go func() {
// waiting for retrieveRR go routines to finish
wg.Wait()
c <- struct{}{}
}()
select {
case <-c:
// success
return
case err := <-errorC:
// some go thread failed
log.Fatal("retrieveRR", err)
}
}

func startGarbageCollect(clusters []jsonClusterInfo) {
log.Println("Garbage Collection started")
}

func enableWrites(hashServer string) {
log.Println("Enable writes at the hash server at", hashServer)
}

func migrateDNS(store *dnsStore, hashServer *string, config *string) {
// retrieve cluster info
clusters := getClusterInfo(*hashServer)
// disable writes
disableWrites(*hashServer)
// update hash configuration
clusters = updateConfig(clusters, config)
PrintClusterConfig(clusters)
// send new config to other Raft clusters
for _, cluster := range clusters[:len(clusters)-1] {
t := strings.Split(cluster.Members[0].GlueRecord, " ")
destIP := t[len(t)-1]
sendClusterInfo(clusters, destIP)
}
// retrieve RR
retrieveRR(clusters, store)
// update cluster info at hash servers
sendClusterInfo(clusters, *hashServer)
// kick off garbage collection at other clusters
startGarbageCollect(clusters)
// enable write
enableWrites(*hashServer)
}
5 changes: 5 additions & 0 deletions dns/dns_store.go
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/buraksezer/consistent"
"github.com/coreos/etcd/snap"

"github.com/miekg/dns"
Expand Down Expand Up @@ -48,6 +49,10 @@ type dnsStore struct {
// for sending http Cache requests
cluster []string
id int
// for adding new cluster and migration
config []jsonClusterInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This differs from what I originally thought of (with state temporarily maintained in dedicated thread instead of becoming part of the store state), but I'll also probably give this a pass for initial attempt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that definitely sounds better, no need to change the store state.

// to compute if a key should be migrated
lookup *consistent.Consistent
}

func newDNSStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commitInfo, errorC <-chan error, cluster []string, id int) *dnsStore {
Expand Down
Binary file removed dns/hash_server/hash_server
Binary file not shown.
74 changes: 70 additions & 4 deletions dns/hash_server/hash_server.go
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
Expand Down Expand Up @@ -215,6 +216,69 @@ func serveHashServerHTTPAPI(store *hashServerStore, port int, done chan<- error)
w.WriteHeader(resp.StatusCode)
})

router.HandleFunc("/clusterinfo", func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Method has to be PUT", http.StatusBadRequest)
return
}

// form a list clusters, each cluster is a list of nsInfo of nodes in the cluster

file, err := os.Open(*configFile)
if err != nil {
log.Fatal(err)
}
defer file.Close()
fileContent, err := ioutil.ReadAll(file)
if err != nil {
log.Fatal(err)
}

io.WriteString(w, string(fileContent))
return
})

// probably updateconfig is a bettre name
// the current name is consistent with httpapi.go
router.HandleFunc("/addcluster", func(w http.ResponseWriter, r *http.Request) {
log.Println("add cluster")
if r.Method != "PUT" {
http.Error(w, "Method has to be PUT", http.StatusBadRequest)
return
}

body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Cannot read /addcluster body: %v\n", err)
http.Error(w, "Bad PUT body", http.StatusBadRequest)
return
}

jsonClusters := make([]jsonClusterInfo, 0)
err = json.Unmarshal(body, &jsonClusters)
if err != nil {
log.Fatal(err)
}

// update cluster info in dnsStore
store.clusters = intoClusterMap(jsonClusters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locks here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes boss

// update consistent
cfg := consistent.Config{
PartitionCount: len(store.clusters),
ReplicationFactor: 2, // We are forced to have number larger than 1
Load: 3,
Hasher: hasher{},
}
log.Println("Received cluster update, setting new config")
store.lookup = consistent.New(nil, cfg)
for _, cluster := range store.clusters {
store.lookup.Add(clusterToken(cluster.token))
log.Println("New cluster:", cluster.token)
}

w.WriteHeader(http.StatusNoContent)
})

go func() {
if err := http.ListenAndServe(":"+strconv.Itoa(port), router); err != nil {
done <- err
Expand Down Expand Up @@ -253,7 +317,7 @@ func sendDNSMsgUntilSuccess(m *dns.Msg, servers []*nsInfo) (*dns.Msg, error) {
}

// Returns true if through direct query we have all answers. Otherwise return false
func tryDirectQuery(store *hashServerStore, batchList []batchedDNSQuestions, msg *dns.Msg) bool {
func tryDirectQuery(store *hashServerStore, batchList []batchedDNSQuestions, msg *dns.Msg, rd bool) bool {
var wg sync.WaitGroup

var answerLock sync.Mutex // protects hasAllAnswers and msg
Expand All @@ -266,7 +330,7 @@ func tryDirectQuery(store *hashServerStore, batchList []batchedDNSQuestions, msg
defer wg.Done()

m := new(dns.Msg)
m.RecursionDesired = true // Also seek recursive. See comment below for behavior implications
m.RecursionDesired = rd // Also seek recursive. See comment below for behavior implications
m.Question = *batch.questions

// This will try all servers one by one on preference order until depletion of the list
Expand Down Expand Up @@ -425,7 +489,7 @@ func serveHashServerUDPAPI(store *hashServerStore) {
mDirect := new(dns.Msg)
mDirect.Id = r.Id
mDirect.Question = append(mDirect.Question, r.Question...)
if tryDirectQuery(store, batchList, mDirect) {
if tryDirectQuery(store, batchList, mDirect, r.RecursionDesired) {
w.WriteMsg(mDirect)
return
}
Expand All @@ -445,14 +509,16 @@ func serveHashServerUDPAPI(store *hashServerStore) {
// The makeshift design is to have the server stop and does the migration manually.
// However in real-world deployment we need to implement transparent migration without killing servers.

var configFile *string

func main() {
rand.Seed(time.Now().Unix())

store := hashServerStore{
clusters: make(map[clusterToken]clusterInfo),
}

configFile := flag.String("config", "", "filename to load initial config")
configFile = flag.String("config", "", "filename to load initial config")
flag.Parse()

if err := loadConfig(&store, *configFile); err != nil {
Expand Down