Skip to content

Commit

Permalink
Merge branch 'master' into release-horizon-v0.25.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn committed Dec 10, 2019
2 parents 9bbfc3c + 8a892f1 commit 2efdfd9
Show file tree
Hide file tree
Showing 22 changed files with 1,119 additions and 51 deletions.
23 changes: 17 additions & 6 deletions README.md
@@ -1,9 +1,18 @@
# Stellar Go
[![Build Status](https://circleci.com/gh/stellar/go.svg?style=shield)](https://circleci.com/gh/stellar/go)
[![GoDoc](https://godoc.org/github.com/stellar/go?status.svg)](https://godoc.org/github.com/stellar/go)
[![Go Report Card](https://goreportcard.com/badge/github.com/stellar/go)](https://goreportcard.com/report/github.com/stellar/go)

This repo is the home for all of the public go code produced by SDF. In addition to various tools and services, this repository is the SDK from which you may develop your own applications that integrate with the stellar network.
<div align="center">
<a href="https://stellar.org"><img alt="Stellar" src="https://www.stellar.org/old-content/2019/03/stellar-logo-solo-1.png" width="558" /></a>
<br/>
<strong>Creating equitable access to the global financial system</strong>
<h1>Stellar Go Monorepo</h1>
</div>
<p align="center">
<a href="https://circleci.com/gh/stellar/go"><img alt="Build Status" src="https://circleci.com/gh/stellar/go.svg?style=shield" /></a>
<a href="https://godoc.org/github.com/stellar/go"><img alt="GoDoc" src="https://godoc.org/github.com/stellar/go?status.svg" /></a>
<a href="https://goreportcard.com/report/github.com/stellar/go"><img alt="Go Report Card" src="https://goreportcard.com/badge/github.com/stellar/go" /></a>
</p>

This repo is the home for all of the public Go code produced by the [Stellar Development Foundation].

This repo contains various tools and services that you can use and deploy, as well as the SDK you can use to develop applications that integrate with the Stellar network.

## Package Index

Expand Down Expand Up @@ -72,3 +81,5 @@ Contributions are welcome! See [CONTRIBUTING.md](CONTRIBUTING.md) for more detai
### Developing

See [DEVELOPING.md](DEVELOPING.md) for helpful instructions for getting started developing code in this repository.

[Stellar Development Foundation]: https://stellar.org
23 changes: 17 additions & 6 deletions exp/hubble/cmd/main.go
Expand Up @@ -7,26 +7,37 @@ import (
"fmt"

"github.com/stellar/go/exp/hubble"
"github.com/stellar/go/support/errors"
)

// If no configuration settings are provided, the default is that
// the user is running a standard local ElasticSearch setup.
const elasticSearchDefaultUrl = "http://127.0.0.1:9200"
const elasticSearchDefaultURL = "http://127.0.0.1:9200"

// Set a default generic index for ElasticSearch.
const elasticSearchDefaultIndex = "testindex"

func main() {
esUrlPtr := flag.String("esurl", elasticSearchDefaultUrl, "URL of running ElasticSearch server")
// TODO: Remove pipelineTypePtr flag once current state pipeline and elastic search are merged.
typeFlagStr := fmt.Sprintf("type of state pipeline, choices are %s and %s", hubble.PipelineDefaultType, hubble.PipelineSearchType)
pipelineTypePtr := flag.String("type", hubble.PipelineDefaultType, typeFlagStr)
esURLPtr := flag.String("esurl", elasticSearchDefaultURL, "URL of running ElasticSearch server")
esIndexPtr := flag.String("esindex", elasticSearchDefaultIndex, "index for ElasticSearch")
flag.Parse()
fmt.Println("Running the pipeline to serialize XDR entries...")
session, err := hubble.NewStatePipelineSession(*esUrlPtr, *esIndexPtr)

pipelineType := *pipelineTypePtr
// Validate that pipeline type is either "current" or "search".
if (pipelineType != hubble.PipelineDefaultType) && (pipelineType != hubble.PipelineSearchType) {
panic(errors.Errorf("invalid pipeline type %s, must be '%s' or '%s'", pipelineType, hubble.PipelineDefaultType, hubble.PipelineSearchType))
}

session, err := hubble.NewStatePipelineSession(pipelineType, *esURLPtr, *esIndexPtr)
if err != nil {
panic(err)
panic(errors.Wrap(err, "could not make new state pipeline session"))
}
fmt.Printf("Running state pipeline session of type %s\n", pipelineType)
err = session.Run()
if err != nil {
panic(err)
panic(errors.Wrap(err, "could not run session"))
}
}
51 changes: 40 additions & 11 deletions exp/hubble/pipeline.go
Expand Up @@ -14,15 +14,33 @@ import (

const archivesURL = "http://history.stellar.org/prd/core-live/core_live_001/"

// NewStatePipelineSession runs a single ledger session.
func NewStatePipelineSession(esUrl, esIndex string) (*ingest.SingleLedgerSession, error) {
// PipelineDefaultType is the default type of state pipeline.
// This will just collect and store the current ledger state in memory.
const PipelineDefaultType = "currentState"

// PipelineSearchType is the other choice for type of state pipeline.
// This state pipeline writes entries to a running Elasticsearch instance.
const PipelineSearchType = "elasticSearch"

// NewStatePipelineSession returns a single ledger state session.
func NewStatePipelineSession(pipelineType, esURL, esIndex string) (*ingest.SingleLedgerSession, error) {
archive, err := newArchive()
if err != nil {
return nil, errors.Wrap(err, "couldn't create archive")
return nil, errors.Wrap(err, "could not create archive")
}
statePipeline, err := newStatePipeline(esUrl, esIndex)
if err != nil {
return nil, errors.Wrap(err, "couldn't create state pipeline")
var statePipeline *pipeline.StatePipeline
if pipelineType == PipelineSearchType {
statePipeline, err = newElasticSearchPipeline(esURL, esIndex)
if err != nil {
return nil, errors.Wrap(err, "could not create elastic search pipeline")
}
} else if pipelineType == PipelineDefaultType {
statePipeline, err = newCurrentStatePipeline()
if err != nil {
return nil, errors.Wrap(err, "could not create current state pipeline")
}
} else {
return nil, errors.Errorf("invalid state pipeline type: %s, can only have current or state", pipelineType)
}
session := &ingest.SingleLedgerSession{
Archive: archive,
Expand All @@ -42,9 +60,20 @@ func newArchive() (*historyarchive.Archive, error) {
return archive, nil
}

func newStatePipeline(esUrl, esIndex string) (*pipeline.StatePipeline, error) {
func newCurrentStatePipeline() (*pipeline.StatePipeline, error) {
sp := &pipeline.StatePipeline{}
csProcessor := &CurrentStateProcessor{
ledgerState: make(map[string]accountState),
}
sp.SetRoot(
pipeline.StateNode(csProcessor),
)
return sp, nil
}

func newElasticSearchPipeline(esURL, esIndex string) (*pipeline.StatePipeline, error) {
sp := &pipeline.StatePipeline{}
client, err := newClientWithIndex(esUrl, esIndex)
client, err := newClientWithIndex(esURL, esIndex)
if err != nil {
return nil, errors.Wrap(err, "couldn't create new es client and index")
}
Expand All @@ -58,16 +87,16 @@ func newStatePipeline(esUrl, esIndex string) (*pipeline.StatePipeline, error) {
return sp, nil
}

func newClientWithIndex(esUrl, esIndex string) (*elastic.Client, error) {
func newClientWithIndex(esURL, esIndex string) (*elastic.Client, error) {
client, err := elastic.NewClient(
elastic.SetURL(esUrl),
elastic.SetURL(esURL),
)
if err != nil {
return nil, errors.Wrap(err, "couldn't create es client")
}

ctx := context.Background()
_, _, err = client.Ping(esUrl).Do(ctx)
_, _, err = client.Ping(esURL).Do(ctx)
if err != nil {
return nil, errors.Wrap(err, "couldn't ping es server")
}
Expand Down
56 changes: 56 additions & 0 deletions exp/hubble/processors.go
Expand Up @@ -8,6 +8,7 @@ import (
stdio "io"
"strconv"

"github.com/kr/pretty"
"github.com/olivere/elastic/v7"
"github.com/stellar/go/exp/ingest/io"
ingestPipeline "github.com/stellar/go/exp/ingest/pipeline"
Expand Down Expand Up @@ -86,3 +87,58 @@ func (p *ESProcessor) PutEntry(ctx context.Context, entry string, id int) error
_, err := p.client.Index().Index(p.index).Id(idStr).BodyString(entry).Do(ctx)
return err
}

// CurrentStateProcessor stores only the current state of each account
// on the ledger.
type CurrentStateProcessor struct {
ledgerState map[string]accountState
}

var _ ingestPipeline.StateProcessor = &CurrentStateProcessor{}

// ProcessState updates the global state using current entries.
func (p *CurrentStateProcessor) ProcessState(ctx context.Context, store *supportPipeline.Store, r io.StateReader, w io.StateWriter) error {
defer w.Close()
defer r.Close()

for {
entry, err := r.Read()
if err != nil {
if err == stdio.EOF {
break
} else {
return err
}
}

accountID, err := makeAccountID(&entry)
if err != nil {
return errors.Wrap(err, "could not get ledger account address")
}
currentState := p.ledgerState[accountID]
newState, err := makeNewAccountState(&currentState, &entry)
if err != nil {
return errors.Wrap(err, "could not update account state")
}
p.ledgerState[accountID] = *newState

select {
case <-ctx.Done():
return nil
default:
continue
}
}
fmt.Printf("%# v", pretty.Formatter(p.ledgerState))
return nil
}

// Reset makes the internal ledger state an empty map.
func (p *CurrentStateProcessor) Reset() {
p.ledgerState = make(map[string]accountState)
}

// Name returns the name of the processor.
func (p *CurrentStateProcessor) Name() string {
return "CSProcessor"
}
40 changes: 40 additions & 0 deletions exp/hubble/schema.go
@@ -0,0 +1,40 @@
// +build go1.13

package hubble

type accountState struct {
address string
seqnum uint32
balance uint32
signers []signer
trustlines map[string]trustline
offers map[uint32]offer
data map[string][]byte
// TODO: May want to track other fields in AccountEntry.
}

type signer struct {
address string
weight uint32
}

type trustline struct {
asset string
balance uint32
limit uint32
authorized bool
// TODO: Add liabilities.
}

// TODO: Save amount as a decimal instead of integer.
// TODO: Save decimal in addition to N and D.
type offer struct {
id uint32
seller string // seller address
selling string // selling asset
buying string // buying asset
amount uint32
priceNum uint16
priceDenom uint16
// TODO: Add flags.
}

0 comments on commit 2efdfd9

Please sign in to comment.