Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support date_histogram aggregation #5

Merged
merged 4 commits into from
Sep 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 111 additions & 35 deletions select_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func handleSelect(sel *sqlparser.Select) (dsl string, esType string, err error)
if len(sel.GroupBy) > 0 {
aggFlag = true
querySize = "0"
//fmt.Printf("%#v\n", sel.GroupBy)
aggStr, err = buildAggs(sel)
if err != nil {
//aggStr = ""
Expand Down Expand Up @@ -249,14 +250,15 @@ func handleSelectWhere(expr *sqlparser.BoolExpr, topLevel bool, parent *sqlparse
}

// extract func expressions from select exprs
func handleSelectSelect(sqlSelect sqlparser.SelectExprs) ([]*sqlparser.FuncExpr, error) {
var res []*sqlparser.FuncExpr
func handleSelectSelect(sqlSelect sqlparser.SelectExprs) ([]*sqlparser.FuncExpr, []*sqlparser.ColName, error) {
var colArr []*sqlparser.ColName
var funcArr []*sqlparser.FuncExpr
for _, v := range sqlSelect {
// non star expressioin means column name
// or some aggregation functions
expr, ok := v.(*sqlparser.NonStarExpr)
if !ok {
// no need to handle
// no need to handle, star expression * just skip is ok
continue
}

Expand All @@ -265,7 +267,7 @@ func handleSelectSelect(sqlSelect sqlparser.SelectExprs) ([]*sqlparser.FuncExpr,
switch expr.Expr.(type) {
case *sqlparser.FuncExpr:
funcExpr := expr.Expr.(*sqlparser.FuncExpr)
res = append(res, funcExpr)
funcArr = append(funcArr, funcExpr)

case *sqlparser.ColName:
continue
Expand All @@ -276,61 +278,130 @@ func handleSelectSelect(sqlSelect sqlparser.SelectExprs) ([]*sqlparser.FuncExpr,
//starExpression like *, table.* should be ignored
//'cause it is meaningless to set fields in elasticsearch aggs
}
return res, nil
return funcArr, colArr, nil
}

// deprecated
// extract colnames from group by
func handleSelectGroupBy(sqlGroupBy sqlparser.GroupBy) ([]*sqlparser.ColName, error) {
var res []*sqlparser.ColName
func handleSelectGroupBy(sqlGroupBy sqlparser.GroupBy) ([]*sqlparser.FuncExpr, []*sqlparser.ColName, error) {
var colArr []*sqlparser.ColName
var funcArr []*sqlparser.FuncExpr
for _, v := range sqlGroupBy {
switch v.(type) {
case *sqlparser.ColName:
colName := v.(*sqlparser.ColName)
res = append(res, colName)
colArr = append(colArr, colName)
case *sqlparser.FuncExpr:
continue
funcExpr := v.(*sqlparser.FuncExpr)
fmt.Printf("%#v\n", funcExpr)
//if funcExpr.Name == "date_histogram"
fmt.Printf("%#v\n", funcExpr.Exprs)
for _, v := range funcExpr.Exprs {
fmt.Printf("%#v\n", v)
}
funcArr = append(funcArr, funcExpr)
}
}
return res, nil
return funcArr, colArr, nil
}

// this function becomes too complicated, need refactor
func buildAggs(sel *sqlparser.Select) (string, error) {
//the outer agg tree is built with the normal field extracted from group by
colNameArr, colErr := handleSelectGroupBy(sel.GroupBy)
//_, colNameArr, colErr := handleSelectGroupBy(sel.GroupBy)

var aggMap = make(map[string]interface{})
// point to the parent map value
var parentNode *map[string]interface{}
for idx, v := range colNameArr {
if idx == 0 {
innerMap := make(map[string]interface{})
innerMap["terms"] = map[string]interface{}{
"field": string(v.Name),
"size": 200,
}
aggMap[string(v.Name)] = innerMap
parentNode = &innerMap
} else {
innerMap := make(map[string]interface{})
innerMap["terms"] = map[string]interface{}{
"field": string(v.Name),
"size": 0,
//for idx, v := range colNameArr {
for idx, v := range sel.GroupBy {
switch v.(type) {
case *sqlparser.ColName:
colName := v.(*sqlparser.ColName)
if idx == 0 {
innerMap := make(map[string]interface{})
innerMap["terms"] = map[string]interface{}{
"field": string(colName.Name),
"size": 200, // this size may need to change ?
}
aggMap[string(colName.Name)] = innerMap
parentNode = &innerMap
} else {
innerMap := make(map[string]interface{})
innerMap["terms"] = map[string]interface{}{
"field": string(colName.Name),
"size": 0,
}
(*parentNode)["aggregations"] = map[string]interface{}{
string(colName.Name): innerMap,
}
parentNode = &innerMap
}
(*parentNode)["aggregations"] = map[string]interface{}{
string(v.Name): innerMap,
case *sqlparser.FuncExpr:
funcExpr := v.(*sqlparser.FuncExpr)
// only handle the needed
var field string
interval := "1h"
format := "yyyy-MM-dd HH:mm:ss"
//fmt.Println(string(funcExpr.Name)) date_histogram
if string(funcExpr.Name) == "date_histogram" {
innerMap := make(map[string]interface{})
//rightStr = strings.Replace(rightStr, `'`, `"`, -1)

//get field/interval and format
for _, expr := range funcExpr.Exprs {
// the expression in date_histogram must be like a = b format
switch expr.(type) {
case *sqlparser.NonStarExpr:
nonStarExpr := expr.(*sqlparser.NonStarExpr)
comparisonExpr, ok := nonStarExpr.Expr.(*sqlparser.ComparisonExpr)
if !ok {
return "", errors.New("unsupported expression in date_histogram")
}
left, ok := comparisonExpr.Left.(*sqlparser.ColName)
if !ok {
return "", errors.New("param error in date_histogram")
}
rightStr := sqlparser.String(comparisonExpr.Right)
rightStr = strings.Replace(rightStr, `'`, ``, -1)
if string(left.Name) == "field" {
field = rightStr
}
if string(left.Name) == "interval" {
interval = rightStr
}
if string(left.Name) == "format" {
format = rightStr
}

default:
return "", errors.New("unsupported expression in date_histogram")
}
}

innerMap["date_histogram"] = map[string]interface{}{
"field": field,
"interval": interval,
"format": format,
}
keyName := sqlparser.String(funcExpr)
keyName = strings.Replace(keyName, `'`, ``, -1)
keyName = strings.Replace(keyName, ` `, ``, -1)
aggMap[keyName] = innerMap
parentNode = &innerMap
}
parentNode = &innerMap
}
}

funcExprArr, funcErr := handleSelectSelect(sel.SelectExprs)
funcExprArr, _, funcErr := handleSelectSelect(sel.SelectExprs)

// the final parentNode is the exact node
// to nest the aggreagation functions
// but v in loop all use the same parentNode
var innerAggMap = make(map[string]interface{})
(*parentNode)["aggregations"] = innerAggMap
parentNode = &innerAggMap
if parentNode == nil {
return "", errors.New("agg not supported yet")
}

for _, v := range funcExprArr {
//func expressions will use the same parent bucket
Expand All @@ -340,20 +411,20 @@ func buildAggs(sel *sqlparser.Select) (string, error) {
case "count":
//count need to distingush * and normal field name
if sqlparser.String(v.Exprs) == "*" {
(*parentNode)[aggName] = map[string]interface{}{
innerAggMap[aggName] = map[string]interface{}{
"value_count": map[string]string{
"field": "_index",
},
}
} else {
(*parentNode)[aggName] = map[string]interface{}{
innerAggMap[aggName] = map[string]interface{}{
"value_count": map[string]string{
"field": sqlparser.String(v.Exprs),
},
}
}
default:
(*parentNode)[aggName] = map[string]interface{}{
innerAggMap[aggName] = map[string]interface{}{
string(v.Name): map[string]string{
"field": sqlparser.String(v.Exprs),
},
Expand All @@ -362,9 +433,14 @@ func buildAggs(sel *sqlparser.Select) (string, error) {

}

if len(innerAggMap) > 0 {
(*parentNode)["aggregations"] = innerAggMap
}

mapJSON, _ := json.Marshal(aggMap)

if colErr == nil && funcErr == nil {
//if colErr == nil && funcErr == nil {
if funcErr == nil {
}

return string(mapJSON), nil
Expand Down
11 changes: 7 additions & 4 deletions select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ var sqlArr = []string{
"select occupy from ark_callcenter where a =1 and b = 2 and c=3",
"select occupy from ark_callcenter where create_time between '2015-01-01 00:00:00' and '2014-02-02 00:00:00'",
"select x from ark where a like '%a%'",
"select channel, count(*) as d from ark where d = 1 group by channel, count(*)",
"select id, count(*) from ark where d = 1 group by channel, count(*)",
"select id, count(id), count(*) from ark where d = 1 group by channel, id, process_id",
"SELECT sum(id),count(channel),avg(area_id),min(area_id), max(process_id), channel from g group by channel",
//"select channel, count(*) as d from ark where d = 1 group by channel, count(*)",
//"select id, count(*) from ark where d = 1 group by channel, count(*)",
//"select id, count(id), count(*) from ark where d = 1 group by channel, id, process_id",
//"SELECT sum(id),count(channel),avg(area_id),min(area_id), max(process_id), channel from g group by channel",
"select count(*) from ark group by date_histogram(field='create_time', value='1h')",
"select * from ark group by date_histogram(field='create_time', value='1h')",
"select * from ark group by date_histogram(field='create_time', value='1h'), id",
}

func TestSelect(t *testing.T) {
Expand Down