Skip to content

Commit

Permalink
Merge 2.28.0 back to mainline, stellar#5183
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Jan 29, 2024
2 parents 77cb331 + 98d54d1 commit c33317d
Show file tree
Hide file tree
Showing 49 changed files with 1,424 additions and 654 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ jobs:
env:
HORIZON_INTEGRATION_TESTS_ENABLED: true
HORIZON_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL: ${{ matrix.protocol-version }}
PROTOCOL_20_CORE_DEBIAN_PKG_VERSION: 20.0.2-1633.669916b56.focal
PROTOCOL_20_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:20.0.2-1633.669916b56.focal
PROTOCOL_20_SOROBAN_RPC_DOCKER_IMG: stellar/soroban-rpc:20.0.2-47
PROTOCOL_20_CORE_DEBIAN_PKG_VERSION: 20.1.0-1656.114b833e7.focal
PROTOCOL_20_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:20.1.0-1656.114b833e7.focal
PROTOCOL_20_SOROBAN_RPC_DOCKER_IMG: stellar/soroban-rpc:20.2.0
PROTOCOL_19_CORE_DEBIAN_PKG_VERSION: 19.14.0-1500.5664eff4e.focal
PROTOCOL_19_CORE_DOCKER_IMG: stellar/stellar-core:19.14.0-1500.5664eff4e.focal
PGHOST: localhost
Expand Down Expand Up @@ -120,7 +120,7 @@ jobs:
key: ${{ env.COMBINED_SOURCE_HASH }}

- if: ${{ steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}
run: go test -race -timeout 45m -v ./services/horizon/internal/integration/...
run: go test -race -timeout 65m -v ./services/horizon/internal/integration/...

- name: Save Horizon binary and integration tests source hash to cache
if: ${{ success() && steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}
Expand Down
74 changes: 67 additions & 7 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/url"
"path"
"regexp"
Expand Down Expand Up @@ -46,8 +45,9 @@ type ArchiveOptions struct {
// CheckpointFrequency is the number of ledgers between checkpoints
// if unset, DefaultCheckpointFrequency will be used
CheckpointFrequency uint32

storage.ConnectOptions
// CacheConfig controls how/if bucket files are cached on the disk.
CacheConfig CacheOptions
}

type Ledger struct {
Expand Down Expand Up @@ -75,6 +75,7 @@ type ArchiveInterface interface {
GetXdrStreamForHash(hash Hash) (*XdrStream, error)
GetXdrStream(pth string) (*XdrStream, error)
GetCheckpointManager() CheckpointManager
GetStats() []ArchiveStats
}

var _ ArchiveInterface = &Archive{}
Expand Down Expand Up @@ -103,6 +104,12 @@ type Archive struct {
checkpointManager CheckpointManager

backend storage.Storage
cache *ArchiveBucketCache
stats archiveStats
}

func (arch *Archive) GetStats() []ArchiveStats {
return []ArchiveStats{&arch.stats}
}

func (arch *Archive) GetCheckpointManager() CheckpointManager {
Expand All @@ -112,6 +119,7 @@ func (arch *Archive) GetCheckpointManager() CheckpointManager {
func (a *Archive) GetPathHAS(path string) (HistoryArchiveState, error) {
var has HistoryArchiveState
rdr, err := a.backend.GetFile(path)
a.stats.incrementDownloads()
if err != nil {
return has, err
}
Expand All @@ -138,6 +146,7 @@ func (a *Archive) GetPathHAS(path string) (HistoryArchiveState, error) {

func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
exists, err := a.backend.Exists(path)
a.stats.incrementRequests()
if err != nil {
return err
}
Expand All @@ -149,19 +158,21 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command
if err != nil {
return err
}
return a.backend.PutFile(path,
ioutil.NopCloser(bytes.NewReader(buf)))
a.stats.incrementUploads()
return a.backend.PutFile(path, io.NopCloser(bytes.NewReader(buf)))
}

func (a *Archive) BucketExists(bucket Hash) (bool, error) {
return a.backend.Exists(BucketPath(bucket))
return a.cachedExists(BucketPath(bucket))
}

func (a *Archive) BucketSize(bucket Hash) (int64, error) {
a.stats.incrementRequests()
return a.backend.Size(BucketPath(bucket))
}

func (a *Archive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
a.stats.incrementRequests()
return a.backend.Exists(CategoryCheckpointPath(cat, chk))
}

Expand Down Expand Up @@ -294,14 +305,17 @@ func (a *Archive) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) erro
}

func (a *Archive) ListBucket(dp DirPrefix) (chan string, chan error) {
a.stats.incrementRequests()
return a.backend.ListFiles(path.Join("bucket", dp.Path()))
}

func (a *Archive) ListAllBuckets() (chan string, chan error) {
a.stats.incrementRequests()
return a.backend.ListFiles("bucket")
}

func (a *Archive) ListAllBucketHashes() (chan Hash, chan error) {
a.stats.incrementRequests()
sch, errs := a.backend.ListFiles("bucket")
ch := make(chan Hash)
rx := regexp.MustCompile("bucket" + hexPrefixPat + "bucket-([0-9a-f]{64})\\.xdr\\.gz$")
Expand All @@ -322,6 +336,7 @@ func (a *Archive) ListCategoryCheckpoints(cat string, pth string) (chan uint32,
ext := categoryExt(cat)
rx := regexp.MustCompile(cat + hexPrefixPat + cat +
"-([0-9a-f]{8})\\." + regexp.QuoteMeta(ext) + "$")
a.stats.incrementRequests()
sch, errs := a.backend.ListFiles(path.Join(cat, pth))
ch := make(chan uint32)
errs = makeErrorPump(errs)
Expand Down Expand Up @@ -359,13 +374,42 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
if !strings.HasSuffix(pth, ".xdr.gz") {
return nil, errors.New("File has non-.xdr.gz suffix: " + pth)
}
rdr, err := a.backend.GetFile(pth)
rdr, err := a.cachedGet(pth)
if err != nil {
return nil, err
}
return NewXdrGzStream(rdr)
}

func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) {
if a.cache != nil {
rdr, foundInCache, err := a.cache.GetFile(pth, a.backend)
if !foundInCache {
a.stats.incrementDownloads()
} else {
a.stats.incrementCacheHits()
}
if err == nil {
return rdr, nil
}

// If there's an error, retry with the uncached backend.
a.cache.Evict(pth)
}

a.stats.incrementDownloads()
return a.backend.GetFile(pth)
}

func (a *Archive) cachedExists(pth string) (bool, error) {
if a.cache != nil && a.cache.Exists(pth) {
return true, nil
}

a.stats.incrementRequests()
return a.backend.Exists(pth)
}

func Connect(u string, opts ArchiveOptions) (*Archive, error) {
arch := Archive{
networkPassphrase: opts.NetworkPassphrase,
Expand All @@ -390,20 +434,36 @@ func Connect(u string, opts ArchiveOptions) (*Archive, error) {

var err error
arch.backend, err = ConnectBackend(u, opts.ConnectOptions)
return &arch, err
if err != nil {
return &arch, err
}

if opts.CacheConfig.Cache {
cache, innerErr := MakeArchiveBucketCache(opts.CacheConfig)
if innerErr != nil {
return &arch, innerErr
}

arch.cache = cache
}

arch.stats = archiveStats{backendName: u}
return &arch, nil
}

func ConnectBackend(u string, opts storage.ConnectOptions) (storage.Storage, error) {
if u == "" {
return nil, errors.New("URL is empty")
}

var err error
parsed, err := url.Parse(u)
if err != nil {
return nil, err
}

var backend storage.Storage

if parsed.Scheme == "mock" {
backend = makeMockBackend()
} else {
Expand Down
Loading

0 comments on commit c33317d

Please sign in to comment.