From dfc53448c5f21ee3d3ed8f8d35025a787a938cdc Mon Sep 17 00:00:00 2001 From: Mohamed Al Ashaal Date: Wed, 9 Jan 2019 15:19:53 +0200 Subject: [PATCH] re-organized the code and added resp server --- README.md | 3 + context.go | 1 + init.go | 3 + macro.go | 138 ++++++++++++++++++++ main.go | 31 +++-- manager.go | 150 +++------------------- routes.go | 14 +- server_resp.go | 107 +++++++++++++++ middleware_authorize.go => server_rest.go | 21 +++ vars.go | 11 +- 10 files changed, 318 insertions(+), 161 deletions(-) create mode 100644 macro.go create mode 100644 server_resp.go rename middleware_authorize.go => server_rest.go (73%) diff --git a/README.md b/README.md index 36dcbaf..a0929ec 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Configuration Overview // create a macro/endpoint called "_boot", // this macro is private "used within other macros" // because it starts with "_". +// this rule only used within `RESTful` context. _boot { // the query we want to execute exec = < 0 { + return errs, errors.New("validation errors") + } + + src, err := m.compileMacro(ctx) + if err != nil { + return nil, err + } + + return m.execSQLQuery(strings.Split(src, ";"), ctx.SQLArgs) +} + +// compileMacro - compile the specified macro and pass the specified ctx +func (m *Macro) compileMacro(ctx *Context) (string, error) { + if m.compiled.Lookup(m.name) == nil { + return "resource not found", errors.New("resource not found") + } + + var buf bytes.Buffer + + rw := io.ReadWriter(&buf) + if err := m.compiled.ExecuteTemplate(rw, m.name, ctx); err != nil { + return "", err + } + + src, err := ioutil.ReadAll(rw) + if err != nil { + return "", err + } + + if len(src) < 1 { + return "", errors.New("empty resource") + } + + return strings.Trim(strings.TrimSpace(string(src)), ";"), nil +} + +// execSQLQuery - execute the specified sql query +func (m *Macro) execSQLQuery(sqls []string, args map[string]interface{}) (interface{}, error) { + conn, err := sqlx.Open(*flagDBDriver, *flagDBDSN) + if err != nil { + return nil, err + } + defer conn.Close() + + for _, sql := range sqls[0 : len(sqls)-1] { + sql = strings.TrimSpace(sql) + if "" == sql { + continue + } + if _, err := conn.NamedExec(sql, args); err != nil { + fmt.Println("....") + return nil, err + } + } + + rows, err := conn.NamedQuery(sqls[len(sqls)-1], args) + if err != nil { + return nil, err + } + defer rows.Close() + + ret := []map[string]interface{}{} + + for rows.Next() { + row, err := m.scanSQLRow(rows) + if err != nil { + continue + } + ret = append(ret, row) + } + + return interface{}(ret), nil +} + +// scanSQLRow - scan a row from the specified rows +func (m *Macro) scanSQLRow(rows *sqlx.Rows) (map[string]interface{}, error) { + row := make(map[string]interface{}) + if err := rows.MapScan(row); err != nil { + return nil, err + } + + for k, v := range row { + if nil == v { + continue + } + + switch v.(type) { + case []uint8: + v = []byte(v.([]uint8)) + default: + v, _ = json.Marshal(v) + } + + var d interface{} + if nil == json.Unmarshal(v.([]byte), &d) { + row[k] = d + } else { + row[k] = string(v.([]byte)) + } + } + + return row, nil +} + +// func (m *Macro) execJS() diff --git a/main.go b/main.go index 88a70d7..a8b5971 100644 --- a/main.go +++ b/main.go @@ -5,27 +5,30 @@ package main import ( "fmt" + "strconv" "github.com/alash3al/go-color" - - "github.com/labstack/echo" - "github.com/labstack/echo/middleware" ) func main() { - e := echo.New() - e.HideBanner = true + fmt.Println(color.MagentaString(sqlerBrand)) + fmt.Printf("⇨ sqler server version: %s \n", color.GreenString(sqlerVersion)) + fmt.Printf("⇨ sqler used dsn is %s \n", color.GreenString(*flagDBDSN)) + fmt.Printf("⇨ sqler workers count: %s \n", color.GreenString(strconv.Itoa(*flagWorkers))) + fmt.Printf("⇨ sqler resp server available at: %s \n", color.GreenString(*flagRESPListenAddr)) + fmt.Printf("⇨ sqler rest server available at: %s \n", color.GreenString(*flagRESTListenAddr)) - e.Pre(middleware.RemoveTrailingSlash()) - e.Use(middleware.CORS()) - e.Use(middleware.GzipWithConfig(middleware.GzipConfig{Level: 9})) - e.Use(middleware.Recover()) + err := make(chan error) - e.GET("/", routeIndex) - e.Any("/:macro", routeExecMacro, middlewareAuthorize) + go (func() { + err <- initRESPServer() + })() - fmt.Println(color.MagentaString(sqlerBrand)) - fmt.Printf("⇨ used dsn is %s \n", color.GreenString(*flagDBDSN)) + go (func() { + err <- initRESTServer() + })() - color.Red(e.Start(*flagListenAddr).Error()) + if err := <-err; err != nil { + color.Red(err.Error()) + } } diff --git a/manager.go b/manager.go index b2e3ce4..154614c 100644 --- a/manager.go +++ b/manager.go @@ -4,38 +4,25 @@ package main import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io" "io/ioutil" "path/filepath" - "strings" "text/template" "github.com/hashicorp/hcl" - "github.com/jmoiron/sqlx" ) -// Macro - a macro configuration -type Macro struct { - Authorizers []string `json:"authorizers"` - Methods []string `json:"method"` - Rules map[string][]string `json:"rules"` - Exec string `json:"exec"` -} - // Manager - a macros manager type Manager struct { - configs map[string]*Macro - macros *template.Template + macros map[string]*Macro + compiled *template.Template } // NewManager - initialize a new manager func NewManager(configpath string) (*Manager, error) { manager := new(Manager) - manager.configs = make(map[string]*Macro) + manager.macros = make(map[string]*Macro) + manager.compiled = template.New("main") + files, _ := filepath.Glob(configpath) for _, file := range files { @@ -50,133 +37,34 @@ func NewManager(configpath string) (*Manager, error) { } for k, v := range config { - manager.configs[k] = v - } - } - - manager.macros = template.New("main") - for k, v := range manager.configs { - _, err := manager.macros.New(k).Parse(v.Exec) - if err != nil { - return nil, err + manager.macros[k] = v + _, err := manager.compiled.New(k).Parse(v.Exec) + if err != nil { + return nil, err + } + v.compiled = manager.compiled + v.name = k } } return manager, nil } -// Call - call the specified macro -func (m *Manager) Call(macro string, input map[string]interface{}) (interface{}, error) { - ctx := NewContext() - ctx.SQLArgs = make(map[string]interface{}) - ctx.Input = input - - src, err := m.compileMacro(macro, ctx) - if err != nil { - return nil, err - } - - return m.execSQLQuery(strings.Split(src, ";"), ctx.SQLArgs) -} - // Get - fetches the required macro func (m *Manager) Get(macro string) *Macro { - return m.configs[macro] + return m.macros[macro] } // Size - return the size of the currently loaded configs func (m *Manager) Size() int { - return len(m.configs) -} - -// compileMacro - compile the specified macro and pass the specified ctx -func (m *Manager) compileMacro(macro string, ctx *Context) (string, error) { - if m.macros.Lookup(macro) == nil { - return "", errors.New("resource not found #1") - } - - var buf bytes.Buffer - rw := io.ReadWriter(&buf) - if err := m.macros.ExecuteTemplate(rw, macro, ctx); err != nil { - return "", err - } - - src, err := ioutil.ReadAll(rw) - if err != nil { - return "", err - } - - if len(src) < 1 { - return "", errors.New("resource not found #2") - } - - return strings.Trim(strings.TrimSpace(string(src)), ";"), nil + return len(m.macros) } -// execSQLQuery - execute the specified sql query -func (m *Manager) execSQLQuery(sqls []string, args map[string]interface{}) (interface{}, error) { - conn, err := sqlx.Open(*flagDBDriver, *flagDBDSN) - if err != nil { - return nil, err - } - defer conn.Close() - - for _, sql := range sqls[0 : len(sqls)-1] { - sql = strings.TrimSpace(sql) - if "" == sql { - continue - } - if _, err := conn.NamedExec(sql, args); err != nil { - fmt.Println("....") - return nil, err - } - } - - rows, err := conn.NamedQuery(sqls[len(sqls)-1], args) - if err != nil { - return nil, err - } - defer rows.Close() - - ret := []map[string]interface{}{} - - for rows.Next() { - row, err := m.scanSQLRow(rows) - if err != nil { - continue - } - ret = append(ret, row) - } - - return interface{}(ret), nil -} - -// scanSQLRow - scan a row from the specified rows -func (m *Manager) scanSQLRow(rows *sqlx.Rows) (map[string]interface{}, error) { - row := make(map[string]interface{}) - if err := rows.MapScan(row); err != nil { - return nil, err - } - - for k, v := range row { - if nil == v { - continue - } - - switch v.(type) { - case []uint8: - v = []byte(v.([]uint8)) - default: - v, _ = json.Marshal(v) - } - - var d interface{} - if nil == json.Unmarshal(v.([]byte), &d) { - row[k] = d - } else { - row[k] = string(v.([]byte)) - } +// List - return a list of registered macros +func (m *Manager) List() (ret []string) { + for k := range m.macros { + ret = append(ret, k) } - return row, nil + return ret } diff --git a/routes.go b/routes.go index 434d92e..0942cfd 100644 --- a/routes.go +++ b/routes.go @@ -31,21 +31,11 @@ func routeExecMacro(c echo.Context) error { input[k] = v } - if len(macro.Rules) > 0 { - result := Validate(input, macro.Rules) - if len(result) > 0 { - return c.JSON(422, map[string]interface{}{ - "success": false, - "errors": result, - }) - } - } - - out, err := macrosManager.Call(c.Param("macro"), input) + out, err := macro.Call(input) if err != nil { return c.JSON(500, map[string]interface{}{ "success": false, - "error": err.Error(), + "error": out, }) } diff --git a/server_resp.go b/server_resp.go new file mode 100644 index 0000000..738b2fa --- /dev/null +++ b/server_resp.go @@ -0,0 +1,107 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/tidwall/redcon" +) + +func initRESPServer() error { + return redcon.ListenAndServe( + *flagRESPListenAddr, + func(conn redcon.Conn, cmd redcon.Command) { + // handles any panic + defer (func() { + if err := recover(); err != nil { + conn.WriteError(fmt.Sprintf("fatal error: %s", (err.(error)).Error())) + } + })() + + // normalize the todo action "command" + // normalize the command arguments + todo := strings.TrimSpace(string(cmd.Args[0])) + todoNormalized := strings.ToLower(todo) + args := []string{} + for _, v := range cmd.Args[1:] { + v := strings.TrimSpace(string(v)) + args = append(args, v) + } + + // internal command to pick a database + if todoNormalized == "select" { + conn.WriteString("OK") + return + } + + // internal ping-pong + if todoNormalized == "ping" { + conn.WriteString("PONG") + return + } + + // ECHO + if todoNormalized == "echo" { + conn.WriteString(strings.Join(args, " ")) + return + } + + // HELP|INFO|LIST + if todoNormalized == "list" || todoNormalized == "help" || todoNormalized == "info" { + conn.WriteArray(macrosManager.Size()) + for _, v := range macrosManager.List() { + conn.WriteBulkString(v) + } + } + + // close the connection + if todoNormalized == "quit" { + conn.WriteString("OK") + conn.Close() + return + } + + macro := macrosManager.Get(todo) + if nil == macro { + conn.WriteError("not found") + conn.Close() + return + } + + var input map[string]interface{} + if len(args) > 0 { + json.Unmarshal([]byte(args[0]), &input) + } + + // handle our command + commandExecMacro(conn, macro, input) + }, + func(conn redcon.Conn) bool { + conn.SetContext(map[string]interface{}{}) + return true + }, + nil, + ) +} + +// commandExecMacro - resp command handler +func commandExecMacro(conn redcon.Conn, macro *Macro, input map[string]interface{}) { + out, err := macro.Call(input) + if err != nil { + conn.WriteArray(2) + conn.WriteInt(0) + + j, _ := json.Marshal(out) + + conn.WriteBulk(j) + + return + } + + jsonOUT, _ := json.Marshal(out) + + conn.WriteArray(2) + conn.WriteInt(1) + conn.WriteBulk(jsonOUT) +} diff --git a/middleware_authorize.go b/server_rest.go similarity index 73% rename from middleware_authorize.go rename to server_rest.go index 330d60e..53903e5 100644 --- a/middleware_authorize.go +++ b/server_rest.go @@ -1,3 +1,6 @@ +// Copyright 2018 The SQLer Authors. All rights reserved. +// Use of this source code is governed by a Apache 2.0 +// license that can be found in the LICENSE file. package main import ( @@ -6,8 +9,26 @@ import ( "github.com/go-resty/resty" "github.com/labstack/echo" + "github.com/labstack/echo/middleware" ) +// initialize RESTful server +func initRESTServer() error { + e := echo.New() + e.HideBanner = true + e.HidePort = true + + e.Pre(middleware.RemoveTrailingSlash()) + e.Use(middleware.CORS()) + e.Use(middleware.GzipWithConfig(middleware.GzipConfig{Level: 9})) + e.Use(middleware.Recover()) + + e.GET("/", routeIndex) + e.Any("/:macro", routeExecMacro, middlewareAuthorize) + + return e.Start(*flagRESTListenAddr) +} + // middlewareAuthorize - the authorizer middleware func middlewareAuthorize(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { diff --git a/vars.go b/vars.go index bb053ee..87218f7 100644 --- a/vars.go +++ b/vars.go @@ -5,15 +5,18 @@ package main import ( "flag" + "runtime" "github.com/bwmarrin/snowflake" ) var ( - flagDBDriver = flag.String("engine", "mysql", "the sql engine/driver to be used") - flagDBDSN = flag.String("dsn", "root:root@tcp(127.0.0.1)/test?multiStatements=true", "the data source name for the selected engine") - flagAPIFile = flag.String("api", "./api.example.hcl", "the validators used before processing the sql, it accepts a glob style pattern") - flagListenAddr = flag.String("listen", ":8025", "the rest api listen address") + flagDBDriver = flag.String("engine", "mysql", "the sql engine/driver to be used") + flagDBDSN = flag.String("dsn", "root:root@tcp(127.0.0.1)/test?multiStatements=true", "the data source name for the selected engine") + flagAPIFile = flag.String("api", "./api.example.hcl", "the validators used before processing the sql, it accepts a glob style pattern") + flagRESTListenAddr = flag.String("rest", ":8025", "the rest api listen address") + flagRESPListenAddr = flag.String("resp", ":3678", "the rest api listen address") + flagWorkers = flag.Int("workers", runtime.NumCPU(), "the maximum workers count") ) var (