Skip to content

Commit

Permalink
First commit, added the files of the sentinel tunnel. Pass basic tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
MeirShpilraien committed May 5, 2017
1 parent ec1f175 commit b5454bd
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ _testmain.go
*.exe
*.test
*.prof

sentinel_tunnel
log.txt
commit.txt
16 changes: 16 additions & 0 deletions sentinel_tunnel_configuration_example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"Sentinels_addresses_list":[
"node1.local:8001",
"node2.local:8001"
],
"Databases":[
{
"Name":"db1",
"Local_port":"12345"
},
{
"Name":"db2",
"Local_port":"12346"
}
]
}
126 changes: 126 additions & 0 deletions sentinel_tunnelling_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package main

import (
// "bufio"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"st_logger"
"st_sentinel_connection"
"time"
)

type SentinelTunnellingDbConfig struct {
Name string
Local_port string
}

type SentinelTunnellingConfiguration struct {
Sentinels_addresses_list []string
Databases []SentinelTunnellingDbConfig
}

type SentinelTunnellingClient struct {
configuration SentinelTunnellingConfiguration
sentinel_connection *st_sentinel_connection.Sentinel_connection
}

type get_db_address_by_name_function func(db_name string) (string, error)

func NewSentinelTunnellingClient(config_file_location string) *SentinelTunnellingClient {
data, err := ioutil.ReadFile(config_file_location)
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "an error has occur during configuration read",
err.Error())
}

Tunnelling_client := SentinelTunnellingClient{}
err1 := json.Unmarshal(data, &(Tunnelling_client.configuration))
if err1 != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "an error has occur during configuration read,",
err1.Error())
}

var err2 error
Tunnelling_client.sentinel_connection, err2 =
st_sentinel_connection.NewSentinelConnection(Tunnelling_client.configuration.Sentinels_addresses_list)
if err2 != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "an error has occur, ",
err2.Error())
}

st_logger.WriteLogMessage(st_logger.INFO, "done initializing Tunnelling")

return &Tunnelling_client
}

func createTunnelling(conn1 net.Conn, conn2 net.Conn) {
io.Copy(conn1, conn2)
conn1.Close()
conn2.Close()
}

func handleConnection(c net.Conn, db_name string,
get_db_address_by_name get_db_address_by_name_function) {
db_address, err := get_db_address_by_name(db_name)
if err != nil {
st_logger.WriteLogMessage(st_logger.ERROR, "cannot get db address for ", db_name,
",", err.Error())
c.Close()
return
}
db_conn, err1 := net.Dial("tcp", db_address)
if err1 != nil {
st_logger.WriteLogMessage(st_logger.ERROR, "cannot connect to db ", db_name,
",", err1.Error())
c.Close()
return
}
go createTunnelling(c, db_conn)
go createTunnelling(db_conn, c)
}

func handleSigleDbConnections(listening_port string, db_name string,
get_db_address_by_name get_db_address_by_name_function) {

listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%s", listening_port))
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "cannot listen to port ",
listening_port, err.Error())
}

st_logger.WriteLogMessage(st_logger.INFO, "listening on port ", listening_port,
" for connections to database: ", db_name)
for {
conn, err := listener.Accept()
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "cannot accept connections on port ",
listening_port, err.Error())
}
go handleConnection(conn, db_name, get_db_address_by_name)
}

}

func (st_client *SentinelTunnellingClient) Start() {
for _, db_conf := range st_client.configuration.Databases {
go handleSigleDbConnections(db_conf.Local_port, db_conf.Name,
st_client.sentinel_connection.GetAddressByDbName)
}
}

func main() {
if len(os.Args) < 3 {
fmt.Println("usage : sentinel_tunnel <config_file_path> <log_file_path>")
return
}
st_logger.InitializeLogger(os.Args[2])
st_client := NewSentinelTunnellingClient(os.Args[1])
st_client.Start()
for {
time.Sleep(1000 * time.Millisecond)
}
}
51 changes: 51 additions & 0 deletions st_logger/st_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package st_logger

import (
"bytes"
"log"
"os"
)

var logger *log.Logger

const (
INFO = iota
ERROR = iota
FATAL = iota
DEBUG = iota
)

func InitializeLogger(log_file_path string) {
file, err := os.OpenFile(log_file_path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalln("Failed to open log file", log_file_path, ":", err)
}
logger = log.New(file,
"",
log.Ldate|log.Ltime)
}

func WriteLogMessage(level int, message ...string) {
var buffer bytes.Buffer
if level == INFO {
buffer.WriteString("info : ")
} else if level == ERROR {
buffer.WriteString("error : ")
} else if level == FATAL {
buffer.WriteString("fatal : ")
} else if level == FATAL {
buffer.WriteString("debug : ")
}

for _, m := range message {
buffer.WriteString(m)
buffer.WriteString(" ")
}

logger.Println(buffer.String())

if level == FATAL {
logger.Println("fatal error occure commiting suicide")
os.Exit(1)
}
}
158 changes: 158 additions & 0 deletions st_sentinel_connection/st_sentinel_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package st_sentinel_connection

import (
"bufio"
"errors"
"fmt"
"net"
"strconv"
"time"
)

type Get_master_addr_reply struct {
reply string
err error
}

type Sentinel_connection struct {
sentinels_addresses []string
current_sentinel_connection net.Conn
reader *bufio.Reader
writer *bufio.Writer
get_master_address_by_name_reply chan *Get_master_addr_reply
get_master_address_by_name chan string
}

const (
client_closed = true
client_not_closed = false
)

func (c *Sentinel_connection) parseRequest() (request []string, err error, is_client_closed bool) {
var ret []string
buf, _, e := c.reader.ReadLine()
if e != nil {
return nil, errors.New("failed read line from client"), client_closed
}
if len(buf) == 0 {
return nil, errors.New("failed read line from client"), client_closed
}
if buf[0] != '*' {
return nil, errors.New("first char in mbulk is not *"), client_not_closed
}
mbulk_size, _ := strconv.Atoi(string(buf[1:]))
if mbulk_size == -1 {
return nil, errors.New("null request"), client_not_closed
}
ret = make([]string, mbulk_size)
for i := 0; i < mbulk_size; i++ {
buf1, _, e1 := c.reader.ReadLine()
if e1 != nil {
return nil, errors.New("failed read line from client"), client_closed
}
if len(buf1) == 0 {
return nil, errors.New("failed read line from client"), client_closed
}
if buf1[0] != '$' {
return nil, errors.New("first char in bulk is not $"), client_not_closed
}
bulk_size, _ := strconv.Atoi(string(buf1[1:]))
buf2, _, e2 := c.reader.ReadLine()
if e2 != nil {
return nil, errors.New("failed read line from client"), client_closed
}
bulk := string(buf2)
if len(bulk) != bulk_size {
return nil, errors.New("wrong bulk size"), client_not_closed
}
ret[i] = bulk
}
return ret, nil, client_not_closed
}

func (c *Sentinel_connection) getMasterAddrByNameFromSentinel(db_name string) (addr []string, returned_err error, is_client_closed bool) {
c.writer.WriteString("*3\r\n")
c.writer.WriteString("$8\r\n")
c.writer.WriteString("sentinel\r\n")
c.writer.WriteString("$23\r\n")
c.writer.WriteString("get-master-addr-by-name\r\n")
c.writer.WriteString(fmt.Sprintf("$%d\r\n", len(db_name)))
c.writer.WriteString(db_name)
c.writer.WriteString("\r\n")
c.writer.Flush()

return c.parseRequest()
}

func (c *Sentinel_connection) retrieveAddressByDbName() {
for db_name := range c.get_master_address_by_name {
addr, err, is_client_closed := c.getMasterAddrByNameFromSentinel(db_name)
if err != nil {
fmt.Println("err: ", err.Error())
if !is_client_closed {
c.get_master_address_by_name_reply <- &Get_master_addr_reply{
reply: "",
err: errors.New("failed to retrieve db name from the sentinel, db_name:" + db_name),
}
}
if !c.reconnecToSentinel() {
c.get_master_address_by_name_reply <- &Get_master_addr_reply{
reply: "",
err: errors.New("failed to connect to any of the sentinel services"),
}
}
continue
}
c.get_master_address_by_name_reply <- &Get_master_addr_reply{
reply: net.JoinHostPort(addr[0], addr[1]),
err: nil,
}
}
}

func (c *Sentinel_connection) reconnecToSentinel() bool {
for _, sentinelAddr := range c.sentinels_addresses {

if c.current_sentinel_connection != nil {
c.current_sentinel_connection.Close()
c.reader = nil
c.writer = nil
c.current_sentinel_connection = nil
}

var err error
c.current_sentinel_connection, err = net.DialTimeout("tcp", sentinelAddr, 300*time.Millisecond)
if err == nil {
c.reader = bufio.NewReader(c.current_sentinel_connection)
c.writer = bufio.NewWriter(c.current_sentinel_connection)
return true
}
fmt.Println(err.Error())
}
return false
}

func (c *Sentinel_connection) GetAddressByDbName(name string) (string, error) {
c.get_master_address_by_name <- name
reply := <-c.get_master_address_by_name_reply
return reply.reply, reply.err
}

func NewSentinelConnection(addresses []string) (*Sentinel_connection, error) {
connection := Sentinel_connection{
sentinels_addresses: addresses,
get_master_address_by_name: make(chan string),
get_master_address_by_name_reply: make(chan *Get_master_addr_reply),
current_sentinel_connection: nil,
reader: nil,
writer: nil,
}

if !connection.reconnecToSentinel() {
return nil, errors.New("could not connect to any sentinels")
}

go connection.retrieveAddressByDbName()

return &connection, nil
}

0 comments on commit b5454bd

Please sign in to comment.