Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement/disk utilization #642

Merged
merged 81 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
2104470
Add config and worker for blobber updates
lpoli Apr 13, 2022
426f2f4
Manage files as per its lookuphash
lpoli Apr 13, 2022
8a794de
Use contenthash instead of lookuphash
lpoli Apr 13, 2022
46807f3
Replace file path calculation with new logic
lpoli Apr 13, 2022
a39b8bf
Fix file delete while moving file to cloud
lpoli Apr 15, 2022
8915266
Modify filestore setup
lpoli Apr 15, 2022
29e5584
Add/modify/optimize file operations as per file manager
lpoli Apr 15, 2022
7ff5571
Reorder functions and implement lock for file write
lpoli Apr 18, 2022
3055513
Restructure setup for testing
lpoli Apr 18, 2022
96fc5fb
Check if a path is a mountpoint
lpoli Apr 18, 2022
b1789f2
Fix flag and mount point check for integration
lpoli Apr 18, 2022
08de46c
Fix null scan error to int64 value
lpoli Apr 18, 2022
596be98
Add negating build tag to unit tests
lpoli Apr 19, 2022
59e3b89
Revert use of build tags
lpoli Apr 19, 2022
9ecc5e4
Fix/Modify download integration test
lpoli Apr 19, 2022
6fbb9a9
Skip integration test if not set in flag
lpoli Apr 19, 2022
8c57dfc
Add build tags to separate integration and unit test partially
lpoli Apr 20, 2022
0096ca0
Merge branch 'staging' into enhancement/disk-utilization
lpoli Apr 20, 2022
32d32e8
Merge branch 'staging' into enhancement/disk-utilization
lpoli Apr 24, 2022
3ee710b
Distinguish from integration test
lpoli Apr 25, 2022
7c73b42
Update comment, delete unused function
lpoli Apr 25, 2022
b5e8642
Add minor code optimization
lpoli Apr 25, 2022
2971e43
Validate actual file content hash w.r.t. clien'ts content hash
lpoli Apr 25, 2022
512da98
Delete connection object after its usage
lpoli Apr 25, 2022
c0cc845
Optimize file write to temporary directory
lpoli Apr 26, 2022
17b5caf
Remove some code redundancy
lpoli Apr 26, 2022
edaad7f
Readability change
lpoli Apr 27, 2022
5cd4073
Modify config for minio setup
lpoli Apr 27, 2022
2fbeba8
Reorganize/Reorder storage related code
lpoli Apr 27, 2022
82c7e75
Reorder argument of Filestorer interface
lpoli Apr 27, 2022
f465f1a
Add temporary fix for tests
lpoli Apr 27, 2022
db801b5
Change flag name
lpoli Apr 27, 2022
64d57fe
Update test
lpoli Apr 28, 2022
4260f3f
Add filestore mocker and update accordingly
lpoli Apr 28, 2022
5343fd2
Add file store mocker for tests in allocation package
lpoli Apr 28, 2022
4ae3932
Add unit test for filestore
lpoli Apr 28, 2022
f7afa27
Add/Modify/Fix/Optimize code
lpoli Apr 28, 2022
791bec3
Fix commit/integration tests issues
lpoli Apr 29, 2022
14267f9
Merge branch 'staging' into enhancement/disk-utilization
lpoli Apr 29, 2022
af60f36
Add/Optimize update blobber capacity and other functions
lpoli Apr 29, 2022
549f3ff
Add tag
lpoli Apr 29, 2022
61dd2fb
Merge branch 'enhancement/disk-utilization' of https://github.com/0ch…
lpoli Apr 29, 2022
a80f1f6
Delete unnecessary file
lpoli Apr 29, 2022
616bcbe
Fix variable name
lpoli Apr 29, 2022
cf1cf9a
Remove unused function
lpoli Apr 29, 2022
451e916
Relax mountpoint requirement
lpoli Apr 29, 2022
6b85e50
Fix mountpoint check temporarily
lpoli Apr 29, 2022
a1131a1
Merge branch 'staging' into enhancement/disk-utilization
lpoli May 2, 2022
562c0cd
Modify yaml file for custom docker build
lpoli May 2, 2022
7453116
Modify content hash calculation and size increment
lpoli May 3, 2022
abb2565
Fix size update
lpoli May 4, 2022
32b4b39
Fix for padding less data
lpoli May 4, 2022
2dab6ef
Move to files_dir flag temporarily for system testing
lpoli May 5, 2022
49c6470
Fix file store unit test
lpoli May 5, 2022
c74fbfb
Merge branch 'staging' into enhancement/disk-utilization
lpoli May 5, 2022
f9a2f67
Modify comparison to length
lpoli May 6, 2022
dfc0d77
Merge branch 'staging' into enhancement/disk-utilization
lpoli May 6, 2022
2ef2d7f
Fix validation/test
lpoli May 6, 2022
a05f4bc
Change back to what staging has
lpoli May 6, 2022
1087461
Change error message
lpoli May 6, 2022
5e0add9
Calculate different content hash for thumbnail
lpoli May 7, 2022
f73642c
Skip thumbnail hash checking
lpoli May 7, 2022
65671e7
Fix hash assignment issue
lpoli May 8, 2022
572c361
Add comments and modify field
lpoli May 10, 2022
ea2609c
Merge branch 'staging' into enhancement/disk-utilization
lpoli May 10, 2022
d9d0d2b
Disable lint check
lpoli May 11, 2022
6493e77
Merge branch 'enhancement/disk-utilization' of https://github.com/0ch…
lpoli May 11, 2022
46b74d8
Rename function
lpoli May 11, 2022
a5c0d8b
Use helper function to check if test is integration
lpoli May 11, 2022
106de2a
Fix return issues
lpoli May 11, 2022
25626ad
Rename functions
lpoli May 11, 2022
43dcbb0
Remove defer for smaller code blocks
lpoli May 12, 2022
b9cebdb
Modify viper's config retrieval function call
lpoli May 12, 2022
1e0a7fa
Modify function to return error and avoid using go routine
lpoli May 12, 2022
0a8d1f3
Fix double unlock call
lpoli May 12, 2022
c70eda1
Query in batches rather than single query
lpoli May 12, 2022
80db660
Fix lint issue
lpoli May 13, 2022
eabffdf
Fix address issue
lpoli May 13, 2022
f507569
Set allocations in batch
lpoli May 13, 2022
b9c9b2b
Merge branch 'staging' into enhancement/disk-utilization
lpoli May 13, 2022
c814373
Rename function
lpoli May 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Expand Up @@ -103,4 +103,4 @@ jobs:
#sudo make integration-tests
go=$(which go)
root=$(pwd)
sudo CGO_ENABLED=1 root=$root integration=1 $go test -tags bn256 ./...
sudo CGO_ENABLED=1 root=$root integration=1 $go test -tags bn256 -tags=integration ./...
27 changes: 25 additions & 2 deletions code/go/0chain.net/blobber/config.go
Expand Up @@ -20,6 +20,17 @@ func setupConfig(configDir string, deploymentMode int) {
// setup config file
config.SetupConfig(configDir)

if mountPoint != "" {
config.Configuration.MountPoint = mountPoint
} else {
config.Configuration.MountPoint = viper.GetString("storage.files_dir")
}

if config.Configuration.MountPoint == "" {
panic("Please specify mount point in flag or config file")
}
config.Configuration.AllocDirLevel = viper.GetIntSlice("storage.alloc_dir_level")
config.Configuration.FileDirLevel = viper.GetIntSlice("storage.file_dir_level")
config.Configuration.DeploymentMode = byte(deploymentMode)
config.Configuration.ChainID = viper.GetString("server_chain.id")
config.Configuration.SignatureScheme = viper.GetString("server_chain.signature_scheme")
Expand All @@ -40,16 +51,28 @@ func setupConfig(configDir string, deploymentMode int) {
config.Configuration.ChallengeResolveNumWorkers = viper.GetInt("challenge_response.num_workers")
config.Configuration.ChallengeMaxRetires = viper.GetInt("challenge_response.max_retries")

config.Configuration.AutomaticUpdate = viper.GetBool("disk_update.automatic_update")
blobberUpdateIntrv := viper.GetDuration("disk_update.blobber_update_interval")
if blobberUpdateIntrv <= 0 {
blobberUpdateIntrv = 5 * time.Minute
}
config.Configuration.BlobberUpdateInterval = blobberUpdateIntrv

config.Configuration.ColdStorageMinimumFileSize = viper.GetInt64("cold_storage.min_file_size")
config.Configuration.ColdStorageTimeLimitInHours = viper.GetInt64("cold_storage.file_time_limit_in_hours")
config.Configuration.ColdStorageJobQueryLimit = viper.GetInt64("cold_storage.job_query_limit")
config.Configuration.ColdStorageStartCapacitySize = viper.GetInt64("cold_storage.start_capacity_size")
config.Configuration.ColdStorageJobQueryLimit = viper.GetInt("cold_storage.job_query_limit")
config.Configuration.ColdStorageStartCapacitySize = viper.GetUint64("cold_storage.start_capacity_size")
config.Configuration.ColdStorageDeleteLocalCopy = viper.GetBool("cold_storage.delete_local_copy")
config.Configuration.ColdStorageDeleteCloudCopy = viper.GetBool("cold_storage.delete_cloud_copy")

config.Configuration.MinioStart = viper.GetBool("minio.start")
config.Configuration.MinioWorkerFreq = viper.GetInt64("minio.worker_frequency")
config.Configuration.MinioUseSSL = viper.GetBool("minio.use_ssl")
config.Configuration.MinioStorageUrl = viper.GetString("minio.storage_service_url")
config.Configuration.MinioAccessID = viper.GetString("minio.access_id")
config.Configuration.MinioSecretKey = viper.GetString("minio.secret_access_key")
config.Configuration.MinioBucket = viper.GetString("minio.bucket_name")
config.Configuration.MinioRegion = viper.GetString("minio.region")

config.Configuration.DBAutoMigrate = viper.GetBool("db.automigrate")
config.Configuration.PGUserName = viper.GetString("pg.user")
Expand Down
18 changes: 13 additions & 5 deletions code/go/0chain.net/blobber/filestore.go
Expand Up @@ -6,14 +6,22 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
)

var fsStore filestore.FileStore //nolint:unused // global which might be needed somewhere

func setupFileStore() (err error) {
fmt.Print("> setup file store")
var fs filestore.FileStorer
if isIntegrationTest {
fs = &filestore.MockStore{}
} else {
fs = &filestore.FileStore{}

}

fsStore, err = filestore.SetupFSStore(filesDir + "/files")
err = fs.Initialize()
if err != nil {
return
}

fmt.Print(" [OK]\n")
filestore.SetFileStore(fs)

return err
return nil
}
8 changes: 2 additions & 6 deletions code/go/0chain.net/blobber/flags.go
Expand Up @@ -10,7 +10,7 @@ var (
deploymentMode int
keysFile string
minioFile string
filesDir string
mountPoint string
metadataDB string
logDir string
httpPort int
Expand All @@ -28,7 +28,7 @@ func init() {
flag.IntVar(&deploymentMode, "deployment_mode", 2, "deployment mode: 0=dev,1=test, 2=mainnet")
flag.StringVar(&keysFile, "keys_file", "", "keys_file")
flag.StringVar(&minioFile, "minio_file", "", "minio_file")
flag.StringVar(&filesDir, "files_dir", "", "files_dir")
flag.StringVar(&mountPoint, "files_dir", "", "Mounted partition where all files will be stored")
flag.StringVar(&metadataDB, "db_dir", "", "db_dir")
flag.StringVar(&logDir, "log_dir", "", "log_dir")
flag.IntVar(&httpPort, "port", 0, "port")
Expand All @@ -47,10 +47,6 @@ func parseFlags() {
fmt.Print("> load flags")
flag.Parse()

peterlimg marked this conversation as resolved.
Show resolved Hide resolved
if filesDir == "" {
panic("Please specify --files_dir absolute folder name option where uploaded files can be stored")
}

if metadataDB == "" {
panic("Please specify --db_dir absolute folder name option where meta data db can be stored")
}
Expand Down
5 changes: 0 additions & 5 deletions code/go/0chain.net/blobber/main.go
Expand Up @@ -23,11 +23,6 @@ func main() {
panic(err)
}

if err := setupMinio(); err != nil {
logging.Logger.Error("Error setting up minio " + err.Error())
panic(err)
}

if err := setupNode(); err != nil {
logging.Logger.Error("Error setting up blobber node " + err.Error())
panic(err)
Expand Down
61 changes: 0 additions & 61 deletions code/go/0chain.net/blobber/minio.go

This file was deleted.

38 changes: 36 additions & 2 deletions code/go/0chain.net/blobber/worker.go
@@ -1,12 +1,14 @@
package main

import (
"context"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/challenge"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/handler"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
Expand All @@ -22,8 +24,10 @@ func setupWorkers() {
challenge.SetupWorkers(root)
readmarker.SetupWorkers(root)
writemarker.SetupWorkers(root)
allocation.StartUpdateWorker(root,
config.Configuration.UpdateAllocationsInterval)
allocation.StartUpdateWorker(root, config.Configuration.UpdateAllocationsInterval)
if config.Configuration.AutomaticUpdate {
go StartUpdateWorker(root, config.Configuration.BlobberUpdateInterval)
}
}

func refreshPriceOnChain() {
Expand Down Expand Up @@ -64,3 +68,33 @@ func startRefreshSettings() {
<-time.After(REPEAT_DELAY * time.Second)
}
}

func StartUpdateWorker(ctx context.Context, interval time.Duration) {
err := filestore.GetFileStore().CalculateCurrentDiskCapacity()
if err != nil {
panic(err)
}
currentCapacity := filestore.GetFileStore().GetCurrentDiskCapacity()

ticker := time.NewTicker(config.Configuration.BlobberUpdateInterval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := filestore.GetFileStore().CalculateCurrentDiskCapacity()
if err != nil {
logging.Logger.Error("Error while getting capacity", zap.Error(err))
break
}
if currentCapacity != filestore.GetFileStore().GetCurrentDiskCapacity() {

err := handler.UpdateBlobberOnChain(ctx)
if err != nil {
logging.Logger.Error("Error while updating blobber updates on chain", zap.Error(err))
}
}
}
}

}
61 changes: 33 additions & 28 deletions code/go/0chain.net/blobber/zcn.go
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/handler"
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
Expand All @@ -23,42 +24,46 @@ func setupOnChain() {
fmt.Print(" + connect to miners: ")
if isIntegrationTest {
fmt.Print(" [SKIP]\n")
} else {
if err := handler.WalletRegister(); err != nil {
fmt.Println(err.Error() + "\n")
panic(err)
}
fmt.Print(" [OK]\n")
return
}

if err := handler.WalletRegister(); err != nil {
fmt.Println(err.Error() + "\n")
panic(err)
}
fmt.Print(" [OK]\n")

var success bool
var err error
// setup blobber (add or update) on the blockchain (multiple attempts)
for i := 1; i <= 10; i++ {
if i == 1 {
fmt.Printf("\r + connect to sharders:")
} else {
fmt.Printf("\r + [%v/10]connect to sharders:", i)
fmt.Printf("\r + [%v/10]connect to sharders:", i)
if err = filestore.GetFileStore().CalculateCurrentDiskCapacity(); err != nil {
fmt.Print("\n ", err.Error()+"\n")
goto sleep
}

if isIntegrationTest {
fmt.Print(" [SKIP]\n")
break
} else {
if err := handler.RegisterBlobber(common.GetRootContext()); err != nil {
if i == 10 { // no more attempts
panic(err)
}
fmt.Print("\n ", err.Error()+"\n")
} else {
fmt.Print(" [OK]\n")
break
}
for n := 0; n < ATTEMPT_DELAY; n++ {
<-time.After(1 * time.Second)

fmt.Printf("\r - wait %v seconds to retry", ATTEMPT_DELAY-n)
}
if err = handler.RegisterBlobber(common.GetRootContext()); err != nil {
fmt.Print("\n ", err.Error()+"\n")
goto sleep
}

fmt.Print(" [OK]\n")
success = true
break

sleep:
for n := 0; n < ATTEMPT_DELAY; n++ {
<-time.After(1 * time.Second)

fmt.Printf("\r - wait %v seconds to retry", ATTEMPT_DELAY-n)
}
}

if !success {
panic(err)
}

if !isIntegrationTest {
go setupWorkers()

Expand Down