Skip to content

Commit

Permalink
[review change]: remove unused code, change view schema, fix the way …
Browse files Browse the repository at this point in the history
…getting new txn
  • Loading branch information
gouhongshen committed Oct 27, 2023
1 parent 2575bfe commit 3521a8b
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 51 deletions.
40 changes: 4 additions & 36 deletions pkg/frontend/show_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"math"
"strconv"
"strings"
"time"
)

const (
Expand Down Expand Up @@ -71,25 +70,7 @@ const (
idxOfSuspendedTime = 4
idxOfComment = 5

//getTableStatsFormat = "select " +
// "( select " +
// " mu2.user_name as `admin_name` " +
// " from mo_catalog.mo_user as mu2 join " +
// " ( select " +
// " min(user_id) as `min_user_id` " +
// " from mo_catalog.mo_user " +
// " ) as mu1 on mu2.user_id = mu1.min_user_id " +
// ") as `admin_name`, " +
// "count(distinct mt.reldatabase) as `db_count`, " +
// "count(distinct mt.relname) as `table_count`, " +
// "sum(mo_table_rows(mt.reldatabase,mt.relname)) as `row_count`, " +
// "cast(sum(mo_table_size(mt.reldatabase,mt.relname))/1048576 as decimal(29,3)) as `size` " +
// "from " +
// "mo_catalog.mo_tables as mt " +
// "where mt.relkind != '%s' and mt.account_id = %d;"

// have excluded the `mo_table_rows` and `mo_table_size`, compared to getTableStatsFormat
// and left the `size_in_mb` as a placeholder
// left the `size` as a placeholder, the value will be embedding later
getTableStatsFormatV2 = "select " +
"( select " +
" mu2.user_name as `admin_name` " +
Expand All @@ -101,7 +82,7 @@ const (
") as `admin_name`, " +
"count(distinct mt.reldatabase) as `db_count`, " +
"count(distinct mt.relname) as `table_count`, " +
"cast(0 as double) as `size_in_mb` " +
"cast(0 as double) as `size` " +
"from " +
"mo_catalog.mo_tables as mt " +
"where mt.relkind != '%s' and mt.account_id = %d;"
Expand Down Expand Up @@ -157,22 +138,10 @@ func requestStorageUsage(ctx context.Context, ses *Session) (resp interface{}, e
return usage, nil
}

// two source:
// 1. sql client
// 2. internal
if ses.proc.TxnOperator == nil {
defer func() { // without this: the transaction xxx has been committed or aborted
ses.proc.TxnOperator = nil
}()
if _, ses.proc.TxnOperator, err = ses.TxnCreate(); err != nil {
return nil, err
}
if ses.proc.Ctx, ses.proc.TxnOperator, err = ses.txnHandler.GetTxn(); err != nil {
return nil, err
}

var cancel context.CancelFunc
ses.proc.Ctx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()

handler := ctl2.GetTNHandlerFunc(ctl.CmdMethod_StorageUsage, whichTN, payload, responseUnmarshaler)
result, err := handler(ses.proc, "DN", "", ctl2.MoCtlTNCmdSender)
if err != nil {
Expand Down Expand Up @@ -537,7 +506,6 @@ func mergeOutputResult(ses *Session, outputBatch *batch.Batch, rsOfMoAccount *ba
if err != nil {
return err
}
//err = outputBatch.Vecs[finalIdxOfRowCount].UnionOne(bat.Vecs[idxOfRowCount], 0, mp)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/objectio/metav2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ const (
SchemaTombstone DataMetaType = 1

CkpMetaStart DataMetaType = 2
// TODO(ghs) check here
CkpMetaEnd DataMetaType = 27 + 2

// CkpMetaEnd = CkpMetaStart + `MaxIDX`
CkpMetaEnd DataMetaType = 26 + 2
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metric/mometric/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const (
ShowAllAccountSQL = "SHOW ACCOUNTS;"
ShowAccountSQL = "SHOW ACCOUNTS like %q;"
ColumnAccountName = "account_name" // result column in `show accounts`, or column in table mo_catalog.mo_account
ColumnSize = "size_in_mb" // result column in `show accounts`, or column in table mo_catalog.mo_account
ColumnSize = "size" // result column in `show accounts`, or column in table mo_catalog.mo_account
ColumnCreatedTime = "created_time" // column in table mo_catalog.mo_account
ColumnStatus = "status" // column in table mo_catalog.mo_account
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package disttae

import (
"context"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -560,6 +561,8 @@ func (tbl *txnTable) Ranges(ctx context.Context, exprs []*plan.Expr) (ranges [][
v2.TxnTableRangeDurationHistogram.Observe(time.Since(start).Seconds())
}()

fmt.Println("-------------- ctx ", ctx)

tbl.writes = tbl.writes[:0]
tbl.writesOffset = tbl.db.txn.statements[tbl.db.txn.statementID-1]

Expand Down
33 changes: 22 additions & 11 deletions pkg/vm/engine/tae/catalog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ func (s *SegStat) loadObjectInfo(name string, blk *BlockEntry) error {

s.loaded = true

// after loading object info, original size is still 0, we have to estimate it by experience
if name == motrace.RawLogTbl && s.originSize == 0 && s.rows != 0 {
factor := 1 + s.rows/1600
s.originSize = (1 << 20) * factor
}

return nil
}

Expand All @@ -118,14 +112,14 @@ func (s *SegStat) GetSortKeyZonemap() index.ZM {
}

func (s *SegStat) SetRows(rows int) {
s.RLock()
defer s.RUnlock()
s.Lock()
defer s.Unlock()
s.rows = rows
}

func (s *SegStat) SetRemainingRows(rows int) {
s.RLock()
defer s.RUnlock()
s.Lock()
defer s.Unlock()
s.remainingRows = rows
}

Expand All @@ -147,6 +141,12 @@ func (s *SegStat) GetOriginSize() int {
return s.originSize
}

func (s *SegStat) SetOriginSize(size int) {
s.Lock()
defer s.Unlock()
s.originSize = size
}

func (s *SegStat) GetCompSize() int {
s.RLock()
defer s.RUnlock()
Expand Down Expand Up @@ -247,9 +247,11 @@ func (entry *SegmentEntry) GetFirstBlkEntry() *BlockEntry {
entry.RLock()
defer entry.RUnlock()

// TODO why nil? print log
// head may be nil
head := entry.link.GetHead()
if head == nil {
fmt.Println("--------------------- link head is nil \n", entry)
return nil
}

Expand All @@ -267,12 +269,21 @@ func (entry *SegmentEntry) Less(b *SegmentEntry) int {

// LoadObjectInfo is called only in merge scanner goroutine, no need to hold lock
func (entry *SegmentEntry) LoadObjectInfo() error {
name := entry.GetTable().GetLastestSchema().Name
defer func() {
// after loading object info, original size is still 0, we have to estimate it by experience
rows := entry.Stat.GetRows()
if name == motrace.RawLogTbl && entry.Stat.GetOriginSize() == 0 && rows != 0 {
factor := 1 + rows/1600
entry.Stat.SetOriginSize((1 << 20) * factor)
}
}()

if entry.Stat.GetLoaded() {
return nil
}

// special case for raw log table.
name := entry.GetTable().GetLastestSchema().Name
if name == motrace.RawLogTbl &&
len(entry.table.entries) > int(common.RuntimeNotLoadMoreThan.Load()) {
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/db/test/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func Test_FillSEGStorageUsageBat(t *testing.T) {
collector.SegmentFn = func(segment *catalog.SegmentEntry) error {
logtail.FillSEGStorageUsageBat(collector.BaseCollector, segment)

require.NotNil(t, segment.GetFirstBlkEntry())
if !segment.IsAppendable() {
require.Equal(t, true, segment.Stat.GetLoaded())
require.NotEqual(t, int(0), segment.Stat.GetOriginSize())
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/logtail/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2444,7 +2444,7 @@ func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error {
// every checkpoint records the full datasets of the storage usage info.
// [account_id, db_id, table_id, table_total_size_in_bytes]
func FillSEGStorageUsageBat(collector *BaseCollector, entry *catalog.SegmentEntry) {
if !entry.IsSortedLocked() || entry.IsAppendable() || entry.HasDropCommitted() {
if !entry.IsSorted() || entry.IsAppendable() || entry.HasDropCommitted() {
return
}

Expand Down

0 comments on commit 3521a8b

Please sign in to comment.