Skip to content

Commit

Permalink
release 130.23
Browse files Browse the repository at this point in the history
  • Loading branch information
yilun zhang committed Mar 8, 2024
1 parent bba9593 commit 4b47e2e
Show file tree
Hide file tree
Showing 43 changed files with 3,758 additions and 3,388 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,7 @@ if err != nil {
### 5.4. 取消订阅

每一个订阅都有一个订阅主题 `topic` 作为唯一标识。如果订阅时 `topic` 已经存在,那么会订阅失败。这时需要通过 `UnSubscribe` 函数取消订阅才能再次订阅。
在使用中注意取消订阅后,通过回调获取数据的流订阅可能还会执行一会儿回调函数,然后才会结束订阅过程。

```go
err = client.UnSubscribe(req)
Expand All @@ -1562,7 +1563,7 @@ if err != nil {

### 5.5. 异构流表数据的处理

DolphinDB 自 1.30.17 及 2.00.5 版本开始,支持通过 replay 函数将多个结构不同的流数据表回放(序列化)到一个流表里,这个流表被称为异构流表。Go API 自 1.30.22 版本开始新增 `streamDeserializer` 类,用于构造异构流表反序列化器,以实现对异构流表的订阅和反序列化操作。
DolphinDB 自 1.30.17 及 2.00.5 版本开始,支持通过 replay 函数将多个结构不同的流数据表回放(序列化)到一个流表里,这个流表被称为异构流表。Go API 自 1.30.22 版本开始新增 `streamDeserializer` 类,用于构造异构流表反序列化器,以实现对异构流表的订阅和反序列化操作。目前异构流表的反序列化不支持 decimal 类型。

#### 5.5.1 异构流表反序列化器

Expand Down
11 changes: 5 additions & 6 deletions model/dataform.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,20 @@ func newCategory(dataForm, datatype byte) *Category {
}

func (cg *Category) render(w *protocol.Writer) error {
return w.Write([]byte{byte(cg.DataType), byte(cg.DataForm)})
return w.Write(protocol.ByteSliceFromInt16Slice([]int16{int16(cg.DataForm) << 8 + int16(cg.DataType)}))
}

func parseCategory(r protocol.Reader) (*Category, error) {
c, err := r.ReadCertainBytes(2)
func parseCategory(r protocol.Reader, bo protocol.ByteOrder) (*Category, error) {
c, err := readShort(r, bo)
if err != nil {
return nil, errors.ReadDataTypeAndDataFormError(err.Error())
}

return newCategory(c[1], c[0]), nil
return newCategory(byte(c >> 8), byte(c << 8 >> 8)), nil
}

// ParseDataForm parses the raw bytes in r with bo and return a DataForm object.
func ParseDataForm(r protocol.Reader, bo protocol.ByteOrder) (DataForm, error) {
c, err := parseCategory(r)
c, err := parseCategory(r, bo)
if err != nil {
return nil, err
}
Expand Down
16 changes: 14 additions & 2 deletions model/parse_dataform.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func parseTable(r protocol.Reader, bo protocol.ByteOrder, c *Category) (*Table,
}

func parseVectorWithCategory(r protocol.Reader, bo protocol.ByteOrder) (*Vector, error) {
c, err := parseCategory(r)
c, err := parseCategory(r, bo)
if err != nil {
return nil, err
}
Expand All @@ -130,7 +130,7 @@ func parseVectorWithCategoryList(r protocol.Reader, bo protocol.ByteOrder, count
list := make([]*Vector, count)
var symBase *symbolBaseCollection
for i := 0; i < count; i++ {
c, err := parseCategory(r)
c, err := parseCategory(r, bo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -230,6 +230,18 @@ func readVectorData(r protocol.Reader, bo protocol.ByteOrder, dv *Vector) error
return err
}

func ParseArrayVector(r protocol.Reader, t DataTypeByte, bo protocol.ByteOrder) (*Vector, error) {
vct := &Vector{
category: &Category{DataForm: DfVector,DataType: t},
RowCount: 1,
}
err := parseArrayVector(r, bo, vct)
if err != nil {
return nil, err
}
return vct, nil
}

func parseArrayVector(r protocol.Reader, bo protocol.ByteOrder, dv *Vector) error {
var err error
dt := dv.GetDataType() - 64
Expand Down
13 changes: 13 additions & 0 deletions model/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (vct *Vector) Set(ind int, d DataType) error {

// Get gets DataType from vct.
// If ind exceeds the size of Vector, return nil.
// ArrayVector does not support Combine.
func (vct *Vector) Get(ind int) DataType {
if ind >= vct.Rows()*int(vct.ColumnCount) && vct.ArrayVector == nil {
return nil
Expand Down Expand Up @@ -229,6 +230,18 @@ func (vct *Vector) GetVectorValue(ind int) *Vector {
return nil
}

// AppendVectorValue appends the vector to arrayVector.
func (vct *Vector) AppendVectorValue(data *Vector) (err error) {
if(vct.category.DataType != data.GetDataType() + 64) {
return fmt.Errorf("mismatched type, expect %s actual %s", GetDataTypeString(vct.category.DataType-64), data.GetDataTypeString())
}
arrayVec := NewArrayVector([]*Vector{data})
vct.ArrayVector = append(vct.ArrayVector, arrayVec...)
vct.ColumnCount += 1
vct.RowCount += 1
return nil
}

// GetDataType returns the byte type of the DataType.
func (vct *Vector) GetDataType() DataTypeByte {
return vct.category.DataType
Expand Down
19 changes: 14 additions & 5 deletions streaming/deserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,21 @@ func (md *msgDeserializer) parse(data []byte) (*model.Vector, error) {

scalarList := make([]model.DataForm, len(md.colTypes))
for k, v := range md.colTypes {
dt, err := model.ParseDataType(rd, v, protocol.LittleEndian)
if err != nil {
return nil, err
if(v >= 64) {
vct, err := model.ParseArrayVector(rd, v, protocol.LittleEndian)
if err != nil {
return nil, err
}
scalarList[k] = vct

} else {
dt, err := model.ParseDataType(rd, v, protocol.LittleEndian)
if err != nil {
return nil, err
}

scalarList[k] = model.NewScalar(dt)
}

scalarList[k] = model.NewScalar(dt)
}

dtl, err := model.NewDataTypeListFromRawData(model.DtAny, scalarList)
Expand Down
4 changes: 2 additions & 2 deletions streaming/goroutine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ func (t *GoroutineClient) subscribe(req *SubscribeRequest) error {

t.handlerLoppers.Store(topicStr, handlerLooper)

go handlerLooper.run()

return nil
}

func (t *GoroutineClient) reviseSubscriber(req *SubscribeRequest) error {
var err error
t.subscriber.once.Do(func() {
fmt.Println("do it")
err = t.subscriber.checkServerVersion(req.Address)
if err == nil {
go listening(t)
Expand Down Expand Up @@ -113,7 +114,6 @@ func (t *GoroutineClient) initHandlerLooper(queue *UnboundedChan, req *Subscribe
// handlerLooper.handler = &DefaultMessageHandler{}
// }

go handlerLooper.run()

return handlerLooper
}
Expand Down
120 changes: 120 additions & 0 deletions streaming/goroutine_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streaming

import (
"context"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -286,3 +287,122 @@ func TestGoroutineClient(t *testing.T) {

time.Sleep(2 * time.Second)
}

type unsubscribeHandler struct {
times int
msgs []IMessage
}

var subReq *SubscribeRequest
var client *GoroutineClient
var ch chan bool

func (s *unsubscribeHandler) DoEvent(msg IMessage) {
s.times += 1
if s.times >= 2 {
fmt.Println("DoEvent: start to UnSubscribe")
client.UnSubscribe(subReq)
ch <- true
}
s.msgs = append(s.msgs, msg)
}

func TestUnsubscribeInDoEvent(t *testing.T) {
ch = make(chan bool)
host := "localhost:8848"
db, err := api.NewDolphinDBClient(context.TODO(), host, nil)

util.AssertNil(err)
loginReq := &api.LoginRequest{
UserID: "admin",
Password: "123456",
}

err = db.Connect()
util.AssertNil(err)

err = db.Login(loginReq)
util.AssertNil(err)

_, err = db.RunScript(scripts)
util.AssertNil(err)

client = NewGoroutineClient("localhost", 12345)

sh := unsubscribeHandler{}
throttle := float32(1)
subReq = &SubscribeRequest{
Address: "localhost:8848",
TableName: "outTables",
ActionName: "action1",
MsgAsTable: false,
Handler: &sh,
Offset: 0,
Reconnect: true,
Throttle: &throttle,
}

err = client.Subscribe(subReq)
assert.Nil(t, err)
time.Sleep(time.Duration(1))
<-ch
fmt.Println("TestUnsubscribeInDoEvent test finish ")
}



var arrayVectorStreamScript = "st1 = streamTable(100:0, `arrayInt`timestampv`sym,[INT[],TIMESTAMP,SYMBOL]);" +
"share st1 as outTables;" +
"outTables.append!(table(arrayVector([4], [1,2,3,4]) as arrayInt, [2018.12.01T01:21:23.000] as timestampv, [`a] as sym));"

type arrayHandle struct {
msgs []IMessage
}

func (s *arrayHandle) DoEvent(msg IMessage) {
fmt.Println(msg.GetValue(1).GetDataTypeString(), msg.GetValue(1).GetDataFormString(), msg)
fmt.Println(msg.GetValue(1))
s.msgs = append(s.msgs, msg)
}

func TestArrayVectorStream(t *testing.T) {
host := "localhost:8848"
db, err := api.NewDolphinDBClient(context.TODO(), host, nil)

util.AssertNil(err)
loginReq := &api.LoginRequest{
UserID: "admin",
Password: "123456",
}

err = db.Connect()
util.AssertNil(err)

err = db.Login(loginReq)
util.AssertNil(err)

_, err = db.RunScript(arrayVectorStreamScript) //script defined in goroutine_client_test.go
util.AssertNil(err)

client := NewGoroutineClient("localhost", 12345)

sh := arrayHandle{make([]IMessage, 0)}
throttle := float32(1)
req := &SubscribeRequest{
Address: "localhost:8848",
TableName: "outTables",
ActionName: "action1",
MsgAsTable: false,
Handler: &sh,
Offset: 0,
Reconnect: true,
Throttle: &throttle,
}
err = client.Subscribe(req)
util.AssertNil(err)

time.Sleep(time.Duration(3) * time.Second)

assert.Equal(t, 1, len(sh.msgs))

}
23 changes: 15 additions & 8 deletions streaming/handler_looper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,13 @@ func (h *handlerLopper) stop() {
default:
close(h.exit)
}
<- h.affirmExit
}

func (h *handlerLopper) run() {
h.exit = make(chan bool)
h.affirmExit = make(chan bool, 1)
for {
select {
case <-h.exit:
h.affirmExit <- true;
return
default:
h.handleMessage()
Expand Down Expand Up @@ -118,7 +115,9 @@ func (h *handlerLopper) handleMessage() {
if err != nil {
fmt.Printf("merge msg to table failed: %s\n", err.Error())
}
h.handler.DoEvent(ret)
if !h.isStopped() {
h.handler.DoEvent(ret)
}
} else if (h.batchSize != nil && *h.batchSize >= 1) {
if(h.MsgDeserializer != nil) {
outMsg := make([]IMessage, 0)
Expand All @@ -130,9 +129,13 @@ func (h *handlerLopper) handleMessage() {
outMsg = append(outMsg, ret)
}
}
h.batchHandler.DoEvent(outMsg)
if !h.isStopped() {
h.batchHandler.DoEvent(outMsg)
}
} else {
h.batchHandler.DoEvent(msg)
if !h.isStopped() {
h.batchHandler.DoEvent(msg)
}
}
} else {
for _, v := range msg {
Expand All @@ -141,10 +144,14 @@ func (h *handlerLopper) handleMessage() {
if err != nil {
fmt.Printf("StreamDeserializer parse failed: %s\n", err.Error())
} else {
h.handler.DoEvent(ret)
if !h.isStopped() {
h.handler.DoEvent(ret)
}
}
} else {
h.handler.DoEvent(v)
if !h.isStopped() {
h.handler.DoEvent(v)
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions streaming/listening.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ func listening(c AbstractClient) {
fmt.Print("")
var conn net.Conn
var ok bool
var isReversed bool
if int(c.getSubscriber().listeningPort) == 0 {
isReversed = true
conn, ok = c.getConn()
if !ok {
runtime.Gosched()
continue;
}
} else {
isReversed = false
connTcp, err := ln.AcceptTCP()
if err != nil {
fmt.Printf("Failed to accept tcp: %s\n", err.Error())
Expand All @@ -58,7 +61,7 @@ func listening(c AbstractClient) {
conn = connTcp
}

err = receiveData(ctx, conn, c)
err = receiveData(ctx, conn, c, isReversed)
if err != nil {
runtime.Gosched()
// time.Sleep(100 * time.Millisecond)
Expand All @@ -74,12 +77,13 @@ func listening(c AbstractClient) {
}
}

func receiveData(ctx context.Context, conn net.Conn, c AbstractClient) error {
func receiveData(ctx context.Context, conn net.Conn, c AbstractClient, isReversed bool) error {
mp := &messageParser{
ctx: ctx,
Conn: conn,
subscriber: c.getSubscriber(),
topicNameToIndex: make(map[string]map[string]int),
isReversed: isReversed,
}

go mp.run()
Expand Down
Loading

0 comments on commit 4b47e2e

Please sign in to comment.