Skip to content

Commit

Permalink
Merge pull request #106 from kl7sn/main
Browse files Browse the repository at this point in the history
fix: log.Fatal
  • Loading branch information
kl7sn committed Mar 25, 2022
2 parents 9144d36 + b6af4d9 commit 7ef1516
Show file tree
Hide file tree
Showing 18 changed files with 481 additions and 137 deletions.
25 changes: 18 additions & 7 deletions api/internal/service/inquiry/builder/builder.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
package builder

type Builder struct {
SQL *SQL
}
import (
"github.com/shimohq/mogo/api/internal/service/inquiry/builder/bumo"
)

func (b *Builder) New() {
b.SQL = new(SQL)
type Builder interface {
NewProject(params bumo.Params)
BuilderCreate()
BuilderFields()
BuilderWhere()
BuilderEngine()
BuilderOrder()
BuilderTTL()
BuilderSetting()
GetResult() interface{}
}

func (b *Builder) Where() {
b.SQL.Where = "1=1"
func Standalone(params bumo.Params, builder Builder) string {
director := new(Director)
director.SetBuilder(builder)
obj := director.Generate(params)
return obj.Gen()
}
90 changes: 90 additions & 0 deletions api/internal/service/inquiry/builder/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package builder

import (
"testing"

"github.com/shimohq/mogo/api/internal/service/inquiry/builder/bumo"
"github.com/shimohq/mogo/api/internal/service/inquiry/builder/standalone"
)

func TestStandaloneData(t *testing.T) {
type args struct {
params bumo.Params
builder Builder
}
tests := []struct {
name string
args args
want string
}{
// TODO: Add test cases.
{
name: "test-1",
args: args{
builder: new(standalone.DataBuilder),
params: bumo.Params{
TableName: "dev.app_stdout",
Days: 3,
TimeTyp: "",
Brokers: "",
Topic: "",
Group: "",
ConsumerNum: 0,
},
},
want: `CREATE TABLE dev.app_stdout
(
_time_second_ DateTime,
_time_nanosecond_ DateTime64(9, 'Asia/Shanghai'),
_source_ String,
_cluster_ String,
_log_agent_ String,
_namespace_ String,
_node_name_ String,
_node_ip_ String,
_container_name_ String,
_pod_name_ String,
_raw_log_ String
)
ENGINE = MergeTree PARTITION BY toYYYYMMDD(_time_second_)
ORDER BY _time_second_
TTL toDateTime(_time_second_) + INTERVAL 3 DAY
SETTINGS index_granularity = 8192;`,
}, {
name: "test-2",
args: args{
builder: new(standalone.StreamBuilder),
params: bumo.Params{
TableName: "dev.app_stdout_stream",
Days: 3,
TimeTyp: "String", // 1 string 2 float
Brokers: "kafka:9092",
Topic: "topic",
Group: "app_stdout",
ConsumerNum: 1,
},
},
want: `CREATE TABLE dev.app_stdout_stream
(
_source_ String,
_pod_name_ String,
_namespace_ String,
_node_name_ String,
_container_name_ String,
_cluster_ String,
_log_agent_ String,
_node_ip_ String,
_time_ String,
_log_ String
)
ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:9092', kafka_topic_list = 'topic', kafka_group_name = 'app_stdout', kafka_format = 'JSONEachRow', kafka_num_consumers = 1;`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := Standalone(tt.args.params, tt.args.builder); got != tt.want {
t.Errorf("StandaloneData() = %v, want %v", got, tt.want)
}
})
}
}
67 changes: 67 additions & 0 deletions api/internal/service/inquiry/builder/bumo/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package bumo

import (
"strings"
)

// builder model = bumo

// QueryAssembly all in one
type QueryAssembly struct {
Params Params
Create string
Fields string
Where string
Engine string
Order string
TTL string
Setting string
result string
}

type Params struct {
// data
TableName string
Days int
// stream
TimeTyp string
Brokers string
Topic string
Group string
ConsumerNum int
// view
ViewTable string
TargetTable string
TimeField string
CommonFields string
SourceTable string
Where string
}

func (q *QueryAssembly) Gen() string {
var res string
if q.Create != "" {
res += q.Create
}
if q.Fields != "" {
res += q.Fields
}
if q.Where != "" {
res += q.Where
}
if q.Engine != "" {
res += q.Engine
}
if q.Order != "" {
res += q.Order
}
if q.TTL != "" {
res += q.TTL
}
if q.Setting != "" {
res += q.Setting
}
res = strings.TrimSuffix(res, "\n")
res += ";"
return res
}
4 changes: 4 additions & 0 deletions api/internal/service/inquiry/builder/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package cluster

// ClusterBuilder clickhouse cluster version
type ClusterBuilder struct{}
24 changes: 24 additions & 0 deletions api/internal/service/inquiry/builder/director.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package builder

import (
"github.com/shimohq/mogo/api/internal/service/inquiry/builder/bumo"
)

type Director struct {
builder Builder
}

func (d *Director) SetBuilder(builder Builder) {
d.builder = builder
}

func (d *Director) Generate(params bumo.Params) *bumo.QueryAssembly {
d.builder.NewProject(params)
d.builder.BuilderCreate()
d.builder.BuilderFields()
d.builder.BuilderEngine()
d.builder.BuilderOrder()
d.builder.BuilderTTL()
d.builder.BuilderSetting()
return d.builder.GetResult().(*bumo.QueryAssembly)
}
60 changes: 60 additions & 0 deletions api/internal/service/inquiry/builder/standalone/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package standalone

import (
"fmt"

"github.com/shimohq/mogo/api/internal/service/inquiry/builder/bumo"
)

// DataBuilder stand-alone cluster version
// _time_ version string/float is the same sql, so we use the same data builder to finish the job.
type DataBuilder struct {
QueryAssembly *bumo.QueryAssembly
}

func (b *DataBuilder) NewProject(params bumo.Params) {
b.QueryAssembly = new(bumo.QueryAssembly)
b.QueryAssembly.Params = params
}

func (b *DataBuilder) BuilderCreate() {
b.QueryAssembly.Create = fmt.Sprintf("CREATE TABLE %s\n", b.QueryAssembly.Params.TableName)
}

func (b *DataBuilder) BuilderFields() {
b.QueryAssembly.Fields = `(
_time_second_ DateTime,
_time_nanosecond_ DateTime64(9, 'Asia/Shanghai'),
_source_ String,
_cluster_ String,
_log_agent_ String,
_namespace_ String,
_node_name_ String,
_node_ip_ String,
_container_name_ String,
_pod_name_ String,
_raw_log_ String
)
`
}

func (b *DataBuilder) BuilderWhere() {
}

func (b *DataBuilder) BuilderEngine() {
b.QueryAssembly.Engine = "ENGINE = MergeTree PARTITION BY toYYYYMMDD(_time_second_)\n"
}

func (b *DataBuilder) BuilderOrder() {
b.QueryAssembly.Order = "ORDER BY _time_second_\n"
}

func (b *DataBuilder) BuilderTTL() {
b.QueryAssembly.TTL = fmt.Sprintf("TTL toDateTime(_time_second_) + INTERVAL %d DAY\n", b.QueryAssembly.Params.Days)
}

func (b *DataBuilder) BuilderSetting() {
b.QueryAssembly.Setting = "SETTINGS index_granularity = 8192\n"
}

func (b *DataBuilder) GetResult() interface{} { return b.QueryAssembly }
54 changes: 54 additions & 0 deletions api/internal/service/inquiry/builder/standalone/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package standalone

import (
"fmt"

"github.com/shimohq/mogo/api/internal/service/inquiry/builder/bumo"
)

// StreamBuilder stand-alone cluster version
type StreamBuilder struct {
QueryAssembly *bumo.QueryAssembly
}

func (b *StreamBuilder) NewProject(params bumo.Params) {
b.QueryAssembly = new(bumo.QueryAssembly)
b.QueryAssembly.Params = params
}

func (b *StreamBuilder) BuilderCreate() {
b.QueryAssembly.Create = fmt.Sprintf("CREATE TABLE %s\n", b.QueryAssembly.Params.TableName)
}

func (b *StreamBuilder) BuilderFields() {
b.QueryAssembly.Fields = fmt.Sprintf(`(
_source_ String,
_pod_name_ String,
_namespace_ String,
_node_name_ String,
_container_name_ String,
_cluster_ String,
_log_agent_ String,
_node_ip_ String,
_time_ %s,
_log_ String
)
`, b.QueryAssembly.Params.TimeTyp)
}

func (b *StreamBuilder) BuilderWhere() {
}

func (b *StreamBuilder) BuilderEngine() {
b.QueryAssembly.Engine = fmt.Sprintf("ENGINE = Kafka SETTINGS kafka_broker_list = '%s', kafka_topic_list = '%s', kafka_group_name = '%s', kafka_format = 'JSONEachRow', kafka_num_consumers = %d\n",
b.QueryAssembly.Params.Brokers, b.QueryAssembly.Params.Topic,
b.QueryAssembly.Params.Group, b.QueryAssembly.Params.ConsumerNum)
}

func (b *StreamBuilder) BuilderOrder() {}

func (b *StreamBuilder) BuilderTTL() {}

func (b *StreamBuilder) BuilderSetting() {}

func (b *StreamBuilder) GetResult() interface{} { return b.QueryAssembly }
51 changes: 51 additions & 0 deletions api/internal/service/inquiry/builder/standalone/view.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package standalone

import (
"fmt"

"github.com/shimohq/mogo/api/internal/service/inquiry/builder/bumo"
)

// ViewBuilder stand-alone cluster version
type ViewBuilder struct {
QueryAssembly *bumo.QueryAssembly
}

func (b *ViewBuilder) NewProject(params bumo.Params) {
b.QueryAssembly = new(bumo.QueryAssembly)
b.QueryAssembly.Params = params
}

func (b *ViewBuilder) BuilderCreate() {
b.QueryAssembly.Create = fmt.Sprintf("CREATE MATERIALIZED VIEW %s TO %s AS\n", b.QueryAssembly.Params.ViewTable, b.QueryAssembly.Params.TargetTable)
}

func (b *ViewBuilder) BuilderFields() {
b.QueryAssembly.Fields = fmt.Sprintf(`SELECT
%s,
_source_,
_cluster_,
_log_agent_,
_namespace_,
_node_name_,
_node_ip_,
_container_name_,
_pod_name_,
_log_ AS _raw_log_%s
FROM %s
`, b.QueryAssembly.Params.TimeField, b.QueryAssembly.Params.CommonFields, b.QueryAssembly.Params.SourceTable)
}

func (b *ViewBuilder) BuilderWhere() {
b.QueryAssembly.Where = fmt.Sprintf("WHERE %s\n", b.QueryAssembly.Params.Where)
}

func (b *ViewBuilder) BuilderEngine() {}

func (b *ViewBuilder) BuilderOrder() {}

func (b *ViewBuilder) BuilderTTL() {}

func (b *ViewBuilder) BuilderSetting() {}

func (b *ViewBuilder) GetResult() interface{} { return b.QueryAssembly }

0 comments on commit 7ef1516

Please sign in to comment.