Skip to content

Commit

Permalink
search index aggregation & group by (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuanwang66 authored and zhoucan1990 committed Dec 25, 2019
1 parent 3652381 commit 236de45
Show file tree
Hide file tree
Showing 54 changed files with 7,763 additions and 289 deletions.
6 changes: 6 additions & 0 deletions sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ func main() {

// globalindex
sample.CreateTableWithGlobalIndexSample(client, "globalindex1")

//SearchIndex: agg & group by
sample.CreateSearchIndexForAggregationAndGroupBy(client, "agg_sample_table", "agg_sample_index")
sample.WriteDataForAggregationAndGroupBy(client, "agg_sample_table")
sample.AggregationSample(client, "agg_sample_table", "agg_sample_index")
sample.GroupBySample(client, "agg_sample_table", "agg_sample_index")
}
322 changes: 321 additions & 1 deletion sample/SearchIndexOperation.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,77 @@ func CreateSearchIndexWithIndexSort(client *tablestore.TableStoreClient, tableNa
fmt.Println("CreateSearchIndex finished, requestId:", resp.ResponseInfo.RequestId)
}

/**
*创建一个SearchIndex,为Aggregation和GroupBy的demo做准备
*/
func CreateSearchIndexForAggregationAndGroupBy(client *tablestore.TableStoreClient, tableName string, indexName string) {
fmt.Println("Begin to create table:", tableName)
createTableRequest := new(tablestore.CreateTableRequest)

tableMeta := new(tablestore.TableMeta)
tableMeta.TableName = tableName
tableMeta.AddPrimaryKeyColumn("pk1", tablestore.PrimaryKeyType_STRING)
tableOption := new(tablestore.TableOption)
tableOption.TimeToAlive = -1
tableOption.MaxVersion = 1
reservedThroughput := new(tablestore.ReservedThroughput)
reservedThroughput.Readcap = 0
reservedThroughput.Writecap = 0
createTableRequest.TableMeta = tableMeta
createTableRequest.TableOption = tableOption
createTableRequest.ReservedThroughput = reservedThroughput

_, err := client.CreateTable(createTableRequest)
if err != nil {
fmt.Println("Failed to create table with error:", err)
} else {
fmt.Println("Create table finished")
}

// create search index
fmt.Println("Begin to create index:", indexName)
request := &tablestore.CreateSearchIndexRequest{}
request.TableName = tableName // 设置表名
request.IndexName = indexName // 设置索引名

var schemas []*tablestore.FieldSchema
field1 := &tablestore.FieldSchema{
FieldName: proto.String("Col_Keyword"), // 设置字段名,使用proto.String用于获取字符串指针
FieldType: tablestore.FieldType_KEYWORD, // 设置字段类型
Index: proto.Bool(true), // 设置开启索引
EnableSortAndAgg: proto.Bool(true), // 设置开启排序与统计功能
}
field2 := &tablestore.FieldSchema{
FieldName: proto.String("Col_Keyword2"), // 设置字段名,使用proto.String用于获取字符串指针
FieldType: tablestore.FieldType_KEYWORD, // 设置字段类型
Index: proto.Bool(true), // 设置开启索引
EnableSortAndAgg: proto.Bool(true), // 设置开启排序与统计功能
}
field3 := &tablestore.FieldSchema{
FieldName: proto.String("Col_Long"),
FieldType: tablestore.FieldType_LONG,
Index: proto.Bool(true),
EnableSortAndAgg: proto.Bool(true),
}
field4 := &tablestore.FieldSchema{
FieldName: proto.String("Col_GeoPoint"),
FieldType: tablestore.FieldType_GEO_POINT,
Index: proto.Bool(true),
EnableSortAndAgg: proto.Bool(true),
}
schemas = append(schemas, field1, field2, field3, field4)

request.IndexSchema = &tablestore.IndexSchema{
FieldSchemas: schemas, // 设置SearchIndex包含的字段
}
resp, err := client.CreateSearchIndex(request) // 调用client创建SearchIndex
if err != nil {
fmt.Println("error :", err)
return
}
fmt.Println("CreateSearchIndex finished, requestId:", resp.ResponseInfo.RequestId)
}

func ListSearchIndex(client *tablestore.TableStoreClient, tableName string) {
request := &tablestore.ListSearchIndexRequest{}
request.TableName = tableName
Expand Down Expand Up @@ -181,6 +252,50 @@ func WriteData(client *tablestore.TableStoreClient, tableName string) {
}
}

/**
* 为Aggregation和GroupBy测试插入数据
*/
func WriteDataForAggregationAndGroupBy(client *tablestore.TableStoreClient, tableName string) {
fmt.Println("Begin to write data")
keywords := []string {"hangzhou", "tablestore", "ots"}
keywords2 := []string {"red", "blue"}
geopoints := []string {
"30.137817,120.08681", //飞天园区
"30.135131,120.088355",//中大银座
"30.181877,120.152818",//中医药地铁站
"30.20223,120.13787",//六和塔
"30.216961,120.157633",//八卦田
"30.231566,120.148578",//太子湾
"30.26058,120.170712", //龙翔桥
"30.269501,120.169347",//凤起路
"30.28073,120.168843",//运河
"30.296946,120.21958",//杭州东站
}

for i := 0; i < 10; i++ {
putRowRequest := new(tablestore.PutRowRequest)
putRowChange := new(tablestore.PutRowChange)
putRowChange.TableName = tableName
putPk := new(tablestore.PrimaryKey)
putPk.AddPrimaryKeyColumn("pk1", fmt.Sprintf("pk_%d", i))

putRowChange.PrimaryKey = putPk
putRowChange.AddColumn("Col_Keyword", keywords[i%len(keywords)])
putRowChange.AddColumn("Col_Keyword2", keywords2[i%len(keywords2)])
if i != 0 {
putRowChange.AddColumn("Col_Long", int64(i))
}
putRowChange.AddColumn("Col_GeoPoint", geopoints[i])
putRowChange.SetCondition(tablestore.RowExistenceExpectation_IGNORE)
putRowRequest.PutRowChange = putRowChange
_, err := client.PutRow(putRowRequest)

if err != nil {
fmt.Println("putrow failed with error:", err)
}
}
}

/**
* 使用Token进行翻页读取。
* 如果SearchResponse返回了NextToken,可以使用这个Token发起下一次查询,
Expand Down Expand Up @@ -695,7 +810,7 @@ func BoolQuery(client *tablestore.TableStoreClient, tableName string, indexName
}

/**
*创建一个SearchIndex,为TEXT类型索引列自定义分词器
* 创建一个SearchIndex,为TEXT类型索引列自定义分词器
*/
func Analysis(client *tablestore.TableStoreClient, tableName string, indexName string) {
fmt.Println("Begin to create table:", tableName)
Expand Down Expand Up @@ -941,3 +1056,208 @@ func Analysis(client *tablestore.TableStoreClient, tableName string, indexName s
}
}
}


/**
* Aggregation示例
*/
func AggregationSample(client *tablestore.TableStoreClient, tableName string, indexName string) {
searchRequest := &tablestore.SearchRequest{}

searchRequest.
SetTableName(tableName). //设置表名
SetIndexName(indexName). //设置多元索引名
SetSearchQuery(search.NewSearchQuery().
SetQuery(&search.MatchAllQuery{}). //匹配所有行
SetLimit(100). //限制返回前100行结果
Aggregation(search.NewAvgAggregation("agg1", "Col_Long")). //计算Col_Long字段的平均值
Aggregation(search.NewDistinctCountAggregation("agg2", "Col_Long")). //计算Col_Long字段不同取值的个数
Aggregation(search.NewMaxAggregation("agg3", "Col_Long")). //计算Col_Long字段的最大值
Aggregation(search.NewSumAggregation("agg4", "Col_Long")). //计算Col_Long字段的和
Aggregation(search.NewCountAggregation("agg5", "Col_Long"))) //计算存在Col_Long字段的行数

// 设置返回所有列
searchRequest.SetColumnsToGet(&tablestore.ColumnsToGet{
ReturnAll: true,
})
searchResponse, err := client.Search(searchRequest)
if err != nil {
fmt.Printf("%#v", err)
return
}
fmt.Println("RequestId: ", searchResponse.RequestId)
fmt.Println("IsAllSuccess: ", searchResponse.IsAllSuccess)
fmt.Println("RowCount: ", len(searchResponse.Rows))
for _, row := range searchResponse.Rows {
jsonBody, err := json.Marshal(row)
if err != nil {
panic(err)
}
fmt.Println("Row: ", string(jsonBody))
}
aggResults := searchResponse.AggregationResults //获取所有统计结果

//avg agg
agg1, err := aggResults.Avg("agg1") //获取名字为"agg1"的Aggregation结果,类型为Avg
if err != nil {
panic(err)
}
if agg1.HasValue() { //名字为"agg1"的Aggregation结果 是否Value值
fmt.Println("(avg) agg1: ", agg1.Value) //打印Col_Long字段平均值
} else {
fmt.Println("(avg) agg1: no value") //所有行都不存在Col_Long字段
}

//distinct count agg
agg2, err := aggResults.DistinctCount("agg2") //获取名字为"agg2"的Aggregation结果,类型为DistinctCount
if err != nil {
panic(err)
}
fmt.Println("(distinct) agg2: ", agg2.Value) //打印Col_Long字段不同取值的个数

//max agg
agg3, err := aggResults.Max("agg3") //获取名字为"agg3"的Aggregation结果,类型为Max
if err != nil {
panic(err)
}
if agg3.HasValue() {
fmt.Println("(max) agg3: ", agg3.Value) //打印Col_Long字段最大值
} else {
fmt.Println("(max) agg3: no value") //所有行都不存在Col_Long字段
}

//sum agg
agg4, err := aggResults.Sum("agg4") //获取名字为"agg4"的Aggregation结果,类型为Sum
if err != nil {
panic(err)
}
fmt.Println("(sum) agg4: ", agg4.Value) //打印Col_Long字段的和

//count agg
agg5, err := aggResults.Count("agg5") //获取名字为"agg5"的Aggregation结果,类型为Count
if err != nil {
panic(err)
}
fmt.Println("(count) agg6: ", agg5.Value) //打印存在Col_Long字段的个数
}

/**
* GroupBy示例
*/
func GroupBySample(client *tablestore.TableStoreClient, tableName string, indexName string) {
searchRequest := &tablestore.SearchRequest{}

searchRequest.
SetTableName(tableName). //设置表名
SetIndexName(indexName). //设置多元索引名
SetSearchQuery(search.NewSearchQuery().
SetQuery(&search.MatchAllQuery{}). //匹配所有行
SetLimit(100). //限制返回前100行结果
GroupBy(search.NewGroupByField("group1", "Col_Keyword"). //对Col_Keyword字段做GroupByField取值聚合
GroupBySorters([]search.GroupBySorter{}). //可以指定返回结果分桶的顺序
Size(2). //仅返回前2个分桶
SubAggregation(search.NewAvgAggregation("sub_agg1", "Col_Long")). //对每个分桶进行子统计(Aggregation)
SubGroupBy(search.NewGroupByField("sub_group1", "Col_Keyword2"))). //对每个分桶进行子聚合(GroupBy)
GroupBy(search.NewGroupByRange("group2", "Col_Long"). //对Col_Long字段做GroupByRange范围
Range(search.NegInf, 3). //第一个分桶包含Col_Long在(-∞, 3)的索引行
Range(3, 5). //第二个分桶包含Col_Long在[3, 5)的索引行
Range(5, search.Inf)). //第三个分桶包含Col_Long在[5, +∞)的索引行
GroupBy(search.NewGroupByFilter("group3"). //做GroupByFilter过滤聚合
Query(&search.TermQuery{ //第一个分桶包含Col_Keyword字段取值为"hangzhou"的索引行
FieldName: "Col_Keyword",
Term: "hangzhou",
}).
Query(&search.RangeQuery{ //第二个分桶包含Col_Long字段取值在[3, 5]范围的索引行
FieldName: "Col_Long",
From: 3,
To: 5,
IncludeLower: true,
IncludeUpper: true})).
GroupBy(search.NewGroupByGeoDistance("group4", "Col_GeoPoint", search.GeoPoint{Lat: 30.137817, Lon:120.08681}). //对Col_GeoPoint字段做GroupByGeoDistance地理范围聚合
Range(search.NegInf, 10000). //第一个分桶包含Col_GeoPoint离中心点距离(-∞, 10km)的索引行
Range(10000, 15000). //第二个分桶包含Col_GeoPoint离中心点距离(10km, 15km)的索引行
Range(15000, search.Inf))) //第三个分桶包含Col_GeoPoint离中心点距离(15km, +∞)的索引行


// 设置返回所有列
searchRequest.SetColumnsToGet(&tablestore.ColumnsToGet{
ReturnAll: true,
})
searchResponse, err := client.Search(searchRequest)
if err != nil {
fmt.Printf("%#v", err)
return
}
fmt.Println("RequestId: ", searchResponse.RequestId)
fmt.Println("IsAllSuccess: ", searchResponse.IsAllSuccess)
fmt.Println("RowCount: ", len(searchResponse.Rows))
for _, row := range searchResponse.Rows {
jsonBody, err := json.Marshal(row)
if err != nil {
panic(err)
}
fmt.Println("Row: ", string(jsonBody))
}
groupByResults := searchResponse.GroupByResults //获取所有聚合结果

group1, err := groupByResults.GroupByField("group1") //获取名字为"group1"的GroupBy结果,类型为GroupByField
if err != nil {
panic(err)
}
fmt.Println("group1: ")
for _, item := range group1.Items { //遍历返回的所有分桶
//item
fmt.Println("\tkey: ", item.Key, ", rowCount: ", item.RowCount) //打印本次分桶的行数

//sub agg
subAgg1, err := item.SubAggregations.Avg("sub_agg1") //获取名字为sub_agg1的子统计的结果
if err != nil {
panic(err)
}
if subAgg1.HasValue() { //如果子统计sub_agg1计算出了Col_Long字段的平均值,则HasValue()返回true
fmt.Println("\t\tsub_agg1: ", subAgg1.Value) //打印本次分桶中,子统计计算出来的Col_Long字段的平均值
}

//sub group by
subGroup1, err := item.SubGroupBys.GroupByField("sub_group1") //获取名字为sub_group1的子聚合的结果
if err != nil {
panic(err)
}
fmt.Println("\t\tsub_group1")
for _, subItem := range subGroup1.Items { //遍历名字为sub_group1的子聚合结果
fmt.Println("\t\t\tkey: ", subItem.Key, ", rowCount: ", subItem.RowCount) //打印sub_group1子聚合的结果分桶,即分桶中的行数
tablestore.Assert(subItem.SubAggregations.Empty(), "")
tablestore.Assert(subItem.SubGroupBys.Empty(), "")
}
}

//group by range
group2, err := groupByResults.GroupByRange("group2") //获取名字为"group2"的GroupBy结果,类型为GroupByRange
if err != nil {
panic(err)
}
fmt.Println("group2: ")
for _, item := range group2.Items { //遍历返回的所有分桶
fmt.Println("\t[", item.From, ", ", item.To, "), rowCount: ", item.RowCount) //打印本次分桶的行数
}

//group by filter
group3, err := groupByResults.GroupByFilter("group3") //获取名字为"group3"的GroupBy结果,类型为GroupByFilter
if err != nil {
panic(err)
}
fmt.Println("group3: ")
for _, item := range group3.Items { //遍历返回的所有分桶
fmt.Println("\trowCount: ", item.RowCount) //打印本次分桶的行数
}

//group by geo distance
group4, err := groupByResults.GroupByGeoDistance("group4") //获取名字为"group4"的GroupBy结果,类型为GroupByGeoDistance
if err != nil {
panic(err)
}
fmt.Println("group4: ")
for _, item := range group4.Items { //遍历返回的所有分桶
fmt.Println("\t[", item.From, ", ", item.To, "), rowCount: ", item.RowCount) //打印本次分桶的行数
}
}
7 changes: 7 additions & 0 deletions tablestore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,11 @@ type TableStoreApi interface {
DescribeStream(request *DescribeStreamRequest) (*DescribeStreamResponse, error)
GetShardIterator(request *GetShardIteratorRequest) (*GetShardIteratorResponse, error)
GetStreamRecord(request *GetStreamRecordRequest) (*GetStreamRecordResponse, error)

// search related
CreateSearchIndex(request *CreateSearchIndexRequest) (*CreateSearchIndexResponse, error)
DeleteSearchIndex(request *DeleteSearchIndexRequest) (*DeleteSearchIndexResponse, error)
ListSearchIndex(request *ListSearchIndexRequest) (*ListSearchIndexResponse, error)
DescribeSearchIndex(request *DescribeSearchIndexRequest) (*DescribeSearchIndexResponse, error)
Search(request *SearchRequest) (*SearchResponse, error)
}
Loading

0 comments on commit 236de45

Please sign in to comment.