diff --git a/client_test.go b/client_test.go index 63d8551..b30f187 100644 --- a/client_test.go +++ b/client_test.go @@ -307,6 +307,92 @@ func TestWriteMonitors(t *testing.T) { } } +func TestUpdateMonitor(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + } + + table, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, table.AddTagColumn("id", types.INT64)) + assert.Nil(t, table.AddTagColumn("host", types.STRING)) + assert.Nil(t, table.AddFieldColumn("memory", types.UINT64)) + assert.Nil(t, table.AddFieldColumn("cpu", types.FLOAT64)) + assert.Nil(t, table.AddFieldColumn("temperature", types.INT64)) + assert.Nil(t, table.AddFieldColumn("running", types.BOOLEAN)) + assert.Nil(t, table.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + for _, monitor := range monitors { + err := table.AddRow(monitor.ID, monitor.Host, + monitor.Memory, monitor.Cpu, monitor.Temperature, monitor.Running, + monitor.Ts) + assert.Nil(t, err) + } + + resp, err := cli.Write(context.Background(), table) + assert.Nil(t, err) + assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) + assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) + assert.Equal(t, uint32(len(monitors)), resp.GetAffectedRows().GetValue()) + + // create a new table to update the monitor + utable, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, utable.AddTagColumn("id", types.INT64)) + assert.Nil(t, utable.AddTagColumn("host", types.STRING)) + assert.Nil(t, utable.AddFieldColumn("memory", types.UINT64)) + assert.Nil(t, utable.AddFieldColumn("cpu", types.FLOAT64)) + assert.Nil(t, utable.AddFieldColumn("temperature", types.INT64)) + assert.Nil(t, utable.AddFieldColumn("running", types.BOOLEAN)) + assert.Nil(t, utable.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + monitors[0].Cpu = 1.1 + updatedMonitor := monitors[0] + + err = utable.AddRow(updatedMonitor.ID, updatedMonitor.Host, + updatedMonitor.Memory, updatedMonitor.Cpu, updatedMonitor.Temperature, updatedMonitor.Running, + updatedMonitor.Ts) + assert.Nil(t, err) + + resp, err = cli.Write(context.Background(), utable) + assert.Nil(t, err) + assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) + assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) + assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id = %d order by host asc", monitorTableName, updatedMonitor.ID)) + + assert.Nil(t, err) + assert.Equal(t, 1, len(monitors_)) + assert.Equal(t, updatedMonitor, monitors_[0]) + +} + func TestDeleteMonitors(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) @@ -401,7 +487,7 @@ func TestDeleteMonitors(t *testing.T) { } } -func TestCreateMonitors(t *testing.T) { +func TestWriteObjMonitors(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() @@ -446,6 +532,56 @@ func TestCreateMonitors(t *testing.T) { } } +func TestUpdateObjMonitor(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + } + + resp, err := cli.WriteObject(context.Background(), monitors) + assert.Nil(t, err) + assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) + assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) + assert.Equal(t, uint32(len(monitors)), resp.GetAffectedRows().GetValue()) + + monitors[0].Cpu = 1.1 + updateMonitor := monitors[0] + + resp, err = cli.WriteObject(context.Background(), updateMonitor) + assert.Nil(t, err) + assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) + assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) + assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) + + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id = %d order by host asc", monitorTableName, updateMonitor.ID)) + assert.Nil(t, err) + assert.Equal(t, 1, len(monitors_)) + assert.Equal(t, updateMonitor, monitors_[0]) + +} func TestDeleteObjMonitors(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) @@ -493,8 +629,8 @@ func TestDeleteObjMonitors(t *testing.T) { assert.Equal(t, uint32(len(monitors)), resp.GetAffectedRows().GetValue()) deleteMonitors := monitors[:1] - resp, err = cli.DeleteObject(context.Background(), deleteMonitors) + resp, err = cli.DeleteObject(context.Background(), deleteMonitors) assert.Nil(t, err) assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) @@ -737,6 +873,91 @@ func TestStreamWrite(t *testing.T) { } } +func TestStreamUpdate(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + } + + table, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, table.AddTagColumn("id", types.INT64)) + assert.Nil(t, table.AddTagColumn("host", types.STRING)) + assert.Nil(t, table.AddFieldColumn("memory", types.UINT64)) + assert.Nil(t, table.AddFieldColumn("cpu", types.FLOAT64)) + assert.Nil(t, table.AddFieldColumn("temperature", types.INT64)) + assert.Nil(t, table.AddFieldColumn("running", types.BOOLEAN)) + assert.Nil(t, table.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + for _, monitor := range monitors { + err := table.AddRow(monitor.ID, monitor.Host, + monitor.Memory, monitor.Cpu, monitor.Temperature, monitor.Running, + monitor.Ts) + assert.Nil(t, err) + } + + err = cli.StreamWrite(context.Background(), table) + assert.Nil(t, err) + affected, err := cli.CloseStream(context.Background()) + assert.EqualValues(t, 2, affected.GetValue()) + assert.Nil(t, err) + + // create a new table to update the monitor + utable, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, utable.AddTagColumn("id", types.INT64)) + assert.Nil(t, utable.AddTagColumn("host", types.STRING)) + assert.Nil(t, utable.AddFieldColumn("memory", types.UINT64)) + assert.Nil(t, utable.AddFieldColumn("cpu", types.FLOAT64)) + assert.Nil(t, utable.AddFieldColumn("temperature", types.INT64)) + assert.Nil(t, utable.AddFieldColumn("running", types.BOOLEAN)) + assert.Nil(t, utable.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + monitors[0].Cpu = 1.1 + updatedMonitor := monitors[0] + + err = utable.AddRow(updatedMonitor.ID, updatedMonitor.Host, + updatedMonitor.Memory, updatedMonitor.Cpu, updatedMonitor.Temperature, updatedMonitor.Running, + updatedMonitor.Ts) + assert.Nil(t, err) + + err = cli.StreamWrite(context.Background(), utable) + assert.Nil(t, err) + affected, err = cli.CloseStream(context.Background()) + assert.EqualValues(t, uint32(1), affected.GetValue()) + assert.Nil(t, err) + + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id = %d order by host asc", monitorTableName, updatedMonitor.ID)) + assert.Nil(t, err) + assert.Equal(t, 1, len(monitors_)) + assert.Equal(t, updatedMonitor, monitors_[0]) +} + func TestStreamDelete(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) @@ -833,7 +1054,7 @@ func TestStreamDelete(t *testing.T) { } } -func TestStreamCreate(t *testing.T) { +func TestStreamWriteObj(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() @@ -878,6 +1099,57 @@ func TestStreamCreate(t *testing.T) { } } +func TestStreamUpdateObj(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + } + + err = cli.StreamWriteObject(context.Background(), monitors) + assert.Nil(t, err) + affected, err := cli.CloseStream(context.Background()) + assert.EqualValues(t, uint32(len(monitors)), affected.GetValue()) + assert.Nil(t, err) + + monitors[0].Cpu = 1.1 + updatedMonitor := monitors[0] + + err = cli.StreamWriteObject(context.Background(), updatedMonitor) + assert.Nil(t, err) + affected, err = cli.CloseStream(context.Background()) + assert.EqualValues(t, uint32(1), affected.GetValue()) + assert.Nil(t, err) + + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id = %d order by host asc", monitorTableName, updatedMonitor.ID)) + assert.Nil(t, err) + + assert.Equal(t, 1, len(monitors_)) + assert.Equal(t, updatedMonitor, monitors_[0]) +} + func TestStreamDeleteObj(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err)