Skip to content

Commit

Permalink
WM cleanup (#1296)
Browse files Browse the repository at this point in the history
* add cleanup worker for wm

* add hdd tablespace

* add log for hdd path

* update default hdd path

* update owner

* add init db script

* we are already creating with tablespace in initdb.sh

* update initdb script.sh

* update sql for write_markers_archive table

---------

Co-authored-by: Manohar Reddy <b.manu199@gmail.com>
Co-authored-by: Yury <yuderbasov@gmail.com>
  • Loading branch information
3 people committed Oct 30, 2023
1 parent c5f3d41 commit 81fda56
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 35 deletions.
4 changes: 4 additions & 0 deletions code/go/0chain.net/blobber/config.go
Expand Up @@ -66,6 +66,10 @@ func setupConfig(configDir string, deploymentMode int) {
config.Configuration.DBUserName = viper.GetString("db.user")
config.Configuration.DBPassword = viper.GetString("db.password")
config.Configuration.DBTablesToKeep = viper.GetStringSlice("db.keep_tables")
config.Configuration.ArchiveDBPath = viper.GetString("db.archive_path")
if config.Configuration.ArchiveDBPath == "" {
config.Configuration.ArchiveDBPath = "/var/lib/postgresql/hdd"
}

config.Configuration.PriceInUSD = viper.GetBool("price_in_usd")

Expand Down
3 changes: 1 addition & 2 deletions code/go/0chain.net/blobber/datastore.go
Expand Up @@ -34,7 +34,6 @@ func setupDatabase() error {

time.Sleep(1 * time.Second)
}

if err := migrateDatabase(pgDB); err != nil {
return fmt.Errorf("error while migrating schema: %v", err)
}
Expand All @@ -49,7 +48,7 @@ func migrateDatabase(db *gorm.DB) error {
if err != nil {
return err
}

goose.Migrate(sqlDB)
return nil
}
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/config/config.go
Expand Up @@ -83,6 +83,7 @@ type Config struct {
DBUserName string
DBPassword string
DBTablesToKeep []string
ArchiveDBPath string
OpenConnectionWorkerFreq int64
OpenConnectionWorkerTolerance int64
WMRedeemFreq int64
Expand Down
Expand Up @@ -660,7 +660,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
writemarkerEntity.ClientPublicKey = clientKey

db := datastore.GetStore().GetTransaction(ctx)

writemarkerEntity.Latest = true
if err = db.Create(writemarkerEntity).Error; err != nil {
return nil, common.NewError("write_marker_error", "Error persisting the write marker")
}
Expand Down Expand Up @@ -1437,6 +1437,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
a.FileMetaRoot = alloc.FileMetaRoot
a.IsRedeemRequired = alloc.IsRedeemRequired
}
writemarkerEntity.Latest = true
err = txn.Create(writemarkerEntity).Error
if err != nil {
txn.Rollback()
Expand Down
66 changes: 38 additions & 28 deletions code/go/0chain.net/blobbercore/writemarker/entity.go
Expand Up @@ -52,6 +52,7 @@ type WriteMarkerEntity struct {
CloseTxnID string `gorm:"column:close_txn_id;size:64"`
ConnectionID string `gorm:"column:connection_id;size:64"`
ClientPublicKey string `gorm:"column:client_key;size:256"`
Latest bool `gorm:"column:latest;not null;default:true"`
Sequence int64 `gorm:"column:sequence;unique;autoIncrement;<-:false;index:idx_seq,unique,priority:2"` // <-:false skips value insert/update by gorm
datastore.ModelWithTS
}
Expand All @@ -72,38 +73,47 @@ func (w *WriteMarkerEntity) BeforeSave(tx *gorm.DB) error {
}

func (wm *WriteMarkerEntity) UpdateStatus(ctx context.Context, status WriteMarkerStatus, statusMessage, redeemTxn string) (err error) {
db := datastore.GetStore().GetTransaction(ctx)
statusBytes, _ := json.Marshal(statusMessage)
err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
db := datastore.GetStore().GetTransaction(ctx)
statusBytes, _ := json.Marshal(statusMessage)

if status == Failed {
wm.ReedeemRetries++
err = db.Model(wm).Updates(WriteMarkerEntity{
Status: status,
StatusMessage: string(statusBytes),
CloseTxnID: redeemTxn,
ReedeemRetries: wm.ReedeemRetries,
}).Error
return err
}

if status == Failed {
wm.ReedeemRetries++
err = db.Model(wm).Updates(WriteMarkerEntity{
Status: status,
StatusMessage: string(statusBytes),
CloseTxnID: redeemTxn,
ReedeemRetries: wm.ReedeemRetries,
Status: status,
StatusMessage: string(statusBytes),
CloseTxnID: redeemTxn,
}).Error
return
}

err = db.Model(wm).Updates(WriteMarkerEntity{
Status: status,
StatusMessage: string(statusBytes),
CloseTxnID: redeemTxn,
}).Error
if err != nil {
return
}

// TODO (sfxdx): what about failed write markers ?
if status != Committed || wm.WM.Size <= 0 {
return // not committed or a deleting marker
}
if err != nil {
return err
}

err = db.Exec("UPDATE write_markers SET latest = false WHERE allocation_id = ? AND allocation_root = ? AND sequence < ?", wm.WM.AllocationID, wm.WM.PreviousAllocationRoot, wm.Sequence).Error
if err != nil {
return err
}

// TODO (sfxdx): what about failed write markers ?
if status != Committed || wm.WM.Size <= 0 {
return err // not committed or a deleting marker
}

// work on pre-redeemed tokens and write-pools balances tracking
if err := allocation.AddToPending(ctx, wm.WM.ClientID, wm.WM.AllocationID, -wm.WM.Size); err != nil {
return fmt.Errorf("can't save allocation pending value: %v", err)
}
return nil
})

// work on pre-redeemed tokens and write-pools balances tracking
if err := allocation.AddToPending(ctx, wm.WM.ClientID, wm.WM.AllocationID, -wm.WM.Size); err != nil {
return fmt.Errorf("can't save allocation pending value: %v", err)
}
return
}

Expand Down
8 changes: 4 additions & 4 deletions code/go/0chain.net/blobbercore/writemarker/protocol.go
Expand Up @@ -116,8 +116,8 @@ func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context) error {
wme.Status = Committed
wme.StatusMessage = t.TransactionOutput
wme.CloseTxnID = t.Hash
err = wme.UpdateStatus(ctx, Committed, t.TransactionOutput, t.Hash)
return err
_ = wme.UpdateStatus(ctx, Committed, t.TransactionOutput, t.Hash)
return nil
}
}

Expand Down Expand Up @@ -165,8 +165,8 @@ func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context) error {
wme.Status = Committed
wme.StatusMessage = t.TransactionOutput
wme.CloseTxnID = t.Hash
err = wme.UpdateStatus(ctx, Committed, t.TransactionOutput, t.Hash)
return err
_ = wme.UpdateStatus(ctx, Committed, t.TransactionOutput, t.Hash)
return nil
}

func (wme *WriteMarkerEntity) VerifyRollbackMarker(ctx context.Context, dbAllocation *allocation.Allocation, latestWM *WriteMarkerEntity) error {
Expand Down
26 changes: 26 additions & 0 deletions code/go/0chain.net/blobbercore/writemarker/worker.go
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
Expand All @@ -20,6 +21,11 @@ var (
mut sync.RWMutex
)

const (
timestampGap = 30 * 24 * 60 * 60 // 30 days
cleanupWorkerInterval = 24 * 7 * time.Hour // 7 days
)

func SetupWorkers(ctx context.Context) {
var res []allocation.Res

Expand All @@ -39,6 +45,7 @@ func SetupWorkers(ctx context.Context) {
}

go startRedeem(ctx)
go startCleanupWorker(ctx)
}

func GetLock(allocationID string) *semaphore.Weighted {
Expand Down Expand Up @@ -182,3 +189,22 @@ func tryAgain(wm *WriteMarkerEntity) {
func retryRedeem(errString string) bool {
return !strings.Contains(errString, "value not present")
}

func startCleanupWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(cleanupWorkerInterval):
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
tx := datastore.GetStore().GetTransaction(ctx)
timestamp := int64(common.Now()) - timestampGap // 30 days
err := tx.Exec("INSERT INTO write_markers_archive (SELECT * from write_markers WHERE timestamp < ? AND latest = )", timestamp, false).Error
if err != nil {
return err
}
return tx.Exec("DELETE FROM write_markers WHERE timestamp < ? AND latest = )", timestamp, false).Error
})
}
}
}
1 change: 1 addition & 0 deletions config/0chain_blobber.yaml
Expand Up @@ -109,6 +109,7 @@ db:
password: blobber
host: postgres
port: 5432
archive_path: "/var/lib/postgresql/hdd"

storage:
files_dir: "/path/to/hdd"
Expand Down
3 changes: 3 additions & 0 deletions docker.local/b0docker-compose.yml
Expand Up @@ -8,10 +8,13 @@ services:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: secret
POSTGRES_HOST_AUTH_METHOD: trust
SLOW_TABLESPACE_PATH: /var/lib/postgresql/hdd # this should match with archive_path in 0chain_blobber.yaml
SLOW_TABLESPACE: hdd_tablespace # this should match with the dbs.events.slowtablespace in 0chain.yaml
#expose_ci_port
volumes:
- ./blobber${BLOBBER}/data/postgresql:/var/lib/postgresql/data
- ./sql_init:/docker-entrypoint-initdb.d
- ../blobber${SHARDER}/data/postgresql2:/var/lib/postgresql/hdd_ts
networks:
default:
validator:
Expand Down
8 changes: 8 additions & 0 deletions docker.local/sql_init/000-init-db.sh
@@ -0,0 +1,8 @@
#!/bin/bash
set -e

mkdir -p $SLOW_TABLESPACE_PATH

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
create tablespace $SLOW_TABLESPACE location '$SLOW_TABLESPACE_PATH';
EOSQL
8 changes: 8 additions & 0 deletions goose/migrations/001_blobber_meta.sql
Expand Up @@ -448,6 +448,7 @@ CREATE TABLE public.write_markers (
client_id character varying(64),
signature character varying(64),
status bigint DEFAULT 0 NOT NULL,
latest bool DEFAULT true NOT NULL,
status_message text,
redeem_retries bigint DEFAULT 0 NOT NULL,
close_txn_id character varying(64),
Expand All @@ -461,6 +462,13 @@ CREATE TABLE public.write_markers (

ALTER TABLE public.write_markers OWNER TO blobber_user;

--
-- Name: write_markers_archive; Type: TABLE; Schema: public; Owner: blobber_user
--

CREATE TABLE public.write_markers_archive AS TABLE public.write_markers WITH NO DATA;
ALTER TABLE public.write_markers_archive SET TABLESPACE hdd_tablespace;

--
-- Name: write_markers_sequence_seq; Type: SEQUENCE; Schema: public; Owner: blobber_user
--
Expand Down

0 comments on commit 81fda56

Please sign in to comment.