Skip to content

Commit

Permalink
added caching function based on redis
Browse files Browse the repository at this point in the history
  • Loading branch information
alash3al committed Jan 11, 2019
1 parent e265eac commit a8b4d85
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 16 deletions.
48 changes: 48 additions & 0 deletions README.md
Expand Up @@ -14,6 +14,7 @@ Table Of Contents
- [Supported Utils](#supported-utils)
- [REST vs RESP](#rest-vs-resp)
- [Data Transformation](#data-transformation)
- [Aggregators](#aggregators)
- [Issue/Suggestion/Contribution ?](#issuesuggestioncontribution)
- [Author](#author)
- [License](#license)
Expand Down Expand Up @@ -231,6 +232,53 @@ databases {
}
```
Aggregators
============
> `SQLer` helps you to merge multiple macros into one to minimize the API calls number, see the example bellow
```hcl
databases {
exec = "SHOW DATABASES"
transformer = <<JS
// there is a global variable called `$result`,
// `$result` holds the result of the sql execution.
(function(){
newResult = []
for ( i in $result ) {
newResult.push($result[i].Database)
}
return newResult
})()
JS
}
tables {
exec = "SELECT `table_schema` as `database`, `table_name` as `table` FROM INFORMATION_SCHEMA.tables"
transformer = <<JS
(function(){
$ret = {}
for ( i in $result ) {
if ( ! $ret[$result[i].database] ) {
$ret[$result[i].database] = [];
}
$ret[$result[i].database].push($result[i].table)
}
return $ret
})()
JS
}
databasesAndTables {
aggregate {
databases = "current_databases"
tables = "current_tables"
}
}
```
Issue/Suggestion/Contribution ?
===============================
`SQLer` is your software, feel free to open an issue with your feature(s), suggestions, ... etc, also you can easily contribute even you aren't a `Go` developer, you can write wikis it is open for all, let's make `SQLer` more powerful.
Expand Down
68 changes: 68 additions & 0 deletions cacher.go
@@ -0,0 +1,68 @@
package main

import (
"time"

"github.com/go-redis/redis"
"github.com/vmihailenco/msgpack"
)

// Cacher - represents a cacher
type Cacher struct {
redis *redis.Client
}

// NewCacher - initialize a new cacher
func NewCacher(redisaddr string) (*Cacher, error) {
opts, err := redis.ParseURL(redisaddr)
if err != nil {
return nil, err
}

c := new(Cacher)
c.redis = redis.NewClient(opts)

if _, err := c.redis.Ping().Result(); err != nil {
return nil, err
}

return c, nil
}

// Put - put a new item into cache
func (c *Cacher) Put(k string, v interface{}, ttl int64, tags []string) {
k = "sqler:cache:value:" + k
data, _ := msgpack.Marshal(v)

c.redis.Set(k, string(data), time.Duration(ttl)*time.Second)

for _, tag := range tags {
tag = "sqler:cache:tag:" + tag
c.redis.SAdd(tag, k)
}
}

// Get - fetch the data of the specified key
func (c *Cacher) Get(k string) interface{} {
k = "sqler:cache:value:" + k
if c.redis.Exists(k).Val() < 1 {
return nil
}

encodedVal := c.redis.Get(k).Val()

var data interface{}

msgpack.Unmarshal([]byte(encodedVal), &data)

return data
}

// ClearTagged - clear cached data tagged with the specified tag
func (c *Cacher) ClearTagged(tag string) {
tag = "sqler:cache:tag:" + tag

for _, k := range c.redis.SMembers(tag).Val() {
c.redis.Del(k)
}
}
6 changes: 4 additions & 2 deletions config.example.hcl
Expand Up @@ -102,14 +102,16 @@ tables {
JS
}

// a mcro that aggregates `databases` macro and `tables` macro into one macro
databasesAndTables {
aggregate {
databases = "current_databases"
tables = "current_tables"
}

cache {
ttl = 3600
link = ["addUser"]
ttl = 100
link = ["adduser"]
ignoreInput = true
}
}
10 changes: 10 additions & 0 deletions init.go
Expand Up @@ -59,4 +59,14 @@ func init() {
os.Exit(0)
}
}

if *flagREDISAddr != "" {
var err error

cacher, err = NewCacher(*flagREDISAddr)
if err != nil {
fmt.Println(color.RedString("redis - (%s)", err.Error()))
os.Exit(0)
}
}
}
70 changes: 56 additions & 14 deletions macro.go
Expand Up @@ -2,13 +2,16 @@ package main

import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"strings"

"github.com/vmihailenco/msgpack"

"github.com/dop251/goja"
"github.com/jmoiron/sqlx"
)
Expand All @@ -21,12 +24,30 @@ type Macro struct {
Exec string `json:"exec"`
Aggregate map[string]string `json:"aggregate"`
Transformer string `json:"transformer"`
name string
manager *Manager
Cache struct {
TTL int64 `json:"ttl"`
Link []string `json:"link"`
IgnoreInput bool `json:"ignore_input"`
} `json:"cache"`
name string
manager *Manager
}

// Call - executes the macro
func (m *Macro) Call(input map[string]interface{}) (interface{}, error) {
cacheKey := m.name
if !m.Cache.IgnoreInput && len(input) > 0 {
cacheKey += ":" + m.encodeInput(input)
}

go cacher.ClearTagged(m.name)

if m.Cache.TTL > 0 {
if cachedValue := cacher.Get(cacheKey); cachedValue != nil {
return cachedValue, nil
}
}

ctx := NewContext()
ctx.SQLArgs = make(map[string]interface{})
ctx.Input = input
Expand All @@ -36,21 +57,36 @@ func (m *Macro) Call(input map[string]interface{}) (interface{}, error) {
return errs, errors.New("validation errors")
}

var out interface{}
var err error

if len(m.Aggregate) > 0 {
return m.aggregate(ctx)
}
out, err = m.aggregate(ctx)
if err != nil {
return err.Error(), err
}
} else {
src, err := m.compileMacro(ctx)
if err != nil {
return err.Error(), err
}

src, err := m.compileMacro(ctx)
if err != nil {
return err.Error(), err
out, err = m.execSQLQuery(strings.Split(src, ";"), ctx.SQLArgs)
if err != nil {
return err.Error(), err
}

out, err = m.execTransformer(out, m.Transformer)
if err != nil {
return err.Error(), err
}
}

out, err := m.execSQLQuery(strings.Split(src, ";"), ctx.SQLArgs)
if err != nil {
return err.Error(), err
if m.Cache.TTL > 0 {
cacher.Put(cacheKey, out, m.Cache.TTL, m.Cache.Link)
}

return m.execTransformer(out, m.Transformer)
return out, nil
}

// compileMacro - compile the specified macro and pass the specified ctx
Expand Down Expand Up @@ -92,7 +128,6 @@ func (m *Macro) execSQLQuery(sqls []string, args map[string]interface{}) (interf
continue
}
if _, err := conn.NamedExec(sql, args); err != nil {
fmt.Println("....")
return nil, err
}
}
Expand Down Expand Up @@ -148,9 +183,10 @@ func (m *Macro) scanSQLRow(rows *sqlx.Rows) (map[string]interface{}, error) {

// execTransformer - run the transformer function
func (m *Macro) execTransformer(data interface{}, transformer string) (interface{}, error) {
if transformer == "" {
if strings.TrimSpace(transformer) == "" {
return data, nil
}

vm := goja.New()

vm.Set("$result", data)
Expand All @@ -160,7 +196,7 @@ func (m *Macro) execTransformer(data interface{}, transformer string) (interface
return nil, err
}

return v, nil
return v.Export(), nil
}

// aggregate - run the aggregators
Expand All @@ -180,3 +216,9 @@ func (m *Macro) aggregate(ctx *Context) (map[string]interface{}, error) {
}
return ret, nil
}

// encodeInput - encode the input as a string
func (m *Macro) encodeInput(in map[string]interface{}) string {
k, _ := msgpack.Marshal(in)
return hex.EncodeToString(k)
}
2 changes: 2 additions & 0 deletions vars.go
Expand Up @@ -16,12 +16,14 @@ var (
flagAPIFile = flag.String("config", "./config.example.hcl", "the config file(s) that contains your endpoints configs, it accepts comma seprated list of glob style pattern")
flagRESTListenAddr = flag.String("rest", ":8025", "the http restful api listen address")
flagRESPListenAddr = flag.String("resp", ":3678", "the resp (redis protocol) server listen address")
flagREDISAddr = flag.String("redis", "redis://localhost:6379/1", "redis server address, used for caching purposes")
flagWorkers = flag.Int("workers", runtime.NumCPU(), "the maximum workers count")
)

var (
macrosManager *Manager
snow *snowflake.Node
cacher *Cacher
)

const (
Expand Down

0 comments on commit a8b4d85

Please sign in to comment.