Skip to content
Permalink
Browse files

codeclimate

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
  • Loading branch information...
hsanjuan committed Jul 24, 2018
1 parent 327a81b commit aa5589d0d8c4b3ff67b267bf412f7c266b90ae1d
Showing with 162 additions and 139 deletions.
  1. +5 −2 .codeclimate.yml
  2. +41 −35 adder/importer.go
  3. +38 −34 adder/params.go
  4. +23 −16 adder/sharding/cluster_dag_builder.go
  5. +15 −12 adder/sharding/dag.go
  6. +2 −1 adder/sharding/verify.go
  7. +2 −2 api/rest/client/request.go
  8. +36 −37 cluster.go
@@ -8,15 +8,18 @@ checks:
threshold: 500
method-complexity:
config:
threshold: 12
threshold: 15
method-lines:
config:
threshold: 80
similar-code:
enabled: false
return-statements:
config:
threshold: 10
threshold: 10
argument-count:
config:
threshold: 6

engines:
fixme:
@@ -74,6 +74,46 @@ func (imp *Importer) start() bool {
return !retVal
}

func (imp *Importer) addFile(ipfsAdder *ipfsadd.Adder) error {
f, err := imp.files.NextFile()
if err != nil {
return err
}

logger.Debugf("ipfsAdder AddFile(%s)", f.FullPath())
return ipfsAdder.AddFile(f)
}

func (imp *Importer) addFiles(ctx context.Context, ipfsAdder *ipfsadd.Adder) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(imp.output)
defer close(imp.blocks)
defer close(imp.errors)

for {
select {
case <-ctx.Done():
imp.errors <- ctx.Err()
return
default:
err := imp.addFile(ipfsAdder)
if err != nil {
if err == io.EOF {
goto FINALIZE
}
imp.errors <- err
return
}
}
}
FINALIZE:
_, err := ipfsAdder.Finalize()
if err != nil {
imp.errors <- err
}
}

// Go starts a goroutine which reads the blocks as outputted by the
// ipfsadd module called with the parameters of this importer. The blocks,
// errors and output are placed in the respective importer channels for
@@ -98,41 +138,7 @@ func (imp *Importer) Go(ctx context.Context) error {
ipfsAdder.Chunker = imp.params.Chunker
ipfsAdder.Out = imp.output

go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(imp.output)
defer close(imp.blocks)
defer close(imp.errors)

for {
select {
case <-ctx.Done():
imp.errors <- ctx.Err()
return
default:
f, err := imp.files.NextFile()
if err != nil {
if err == io.EOF {
goto FINALIZE // time to finalize
}
imp.errors <- err
return
}

logger.Debugf("ipfsAdder AddFile(%s)", f.FullPath())
if err := ipfsAdder.AddFile(f); err != nil {
imp.errors <- err
return
}
}
}
FINALIZE:
_, err := ipfsAdder.Finalize()
if err != nil {
imp.errors <- err
}
}()
go imp.addFiles(ctx, ipfsAdder)
return nil
}

@@ -2,6 +2,7 @@ package adder

import (
"errors"
"fmt"
"net/url"
"strconv"

@@ -40,6 +41,28 @@ func DefaultParams() *Params {
}
}

func parseBoolParam(q url.Values, name string, dest *bool) error {
if v := q.Get(name); v != "" {
b, err := strconv.ParseBool(v)
if err != nil {
return fmt.Errorf("parameter %s invalid", name)
}
*dest = b
}
return nil
}

func parseIntParam(q url.Values, name string, dest *int) error {
if v := q.Get(name); v != "" {
i, err := strconv.Atoi(v)
if err != nil {
return fmt.Errorf("parameter %s invalid", name)
}
*dest = i
}
return nil
}

// ParamsFromQuery parses the Params object from
// a URL.Query().
func ParamsFromQuery(query url.Values) (*Params, error) {
@@ -61,44 +84,25 @@ func ParamsFromQuery(query url.Values) (*Params, error) {
name := query.Get("name")
params.Name = name

if v := query.Get("raw"); v != "" {
raw, err := strconv.ParseBool(v)
if err != nil {
return nil, errors.New("parameter raw invalid")
}
params.RawLeaves = raw
err := parseBoolParam(query, "raw", &params.RawLeaves)
if err != nil {
return nil, err
}

if v := query.Get("hidden"); v != "" {
hidden, err := strconv.ParseBool(v)
if err != nil {
return nil, errors.New("parameter hidden invalid")
}
params.Hidden = hidden
err = parseBoolParam(query, "hidden", &params.Hidden)
if err != nil {
return nil, err
}

if v := query.Get("shard"); v != "" {
shard, err := strconv.ParseBool(v)
if err != nil {
return nil, errors.New("parameter shard invalid")
}
params.Shard = shard
err = parseBoolParam(query, "shard", &params.Shard)
if err != nil {
return nil, err
}

if v := query.Get("repl_min"); v != "" {
replMin, err := strconv.Atoi(v)
if err != nil || replMin < -1 {
return nil, errors.New("parameter repl_min invalid")
}
params.ReplicationFactorMin = replMin
err = parseIntParam(query, "repl_min", &params.ReplicationFactorMin)
if err != nil {
return nil, err
}

if v := query.Get("repl_max"); v != "" {
replMax, err := strconv.Atoi(v)
if err != nil || replMax < -1 {
return nil, errors.New("parameter repl_max invalid")
}
params.ReplicationFactorMax = replMax
err = parseIntParam(query, "repl_max", &params.ReplicationFactorMax)
if err != nil {
return nil, err
}

if v := query.Get("shard_size"); v != "" {
@@ -188,32 +188,39 @@ func (cdb *clusterDAGBuilder) finalize() error {
return nil
}

// returns the value for continue in ingestBlocks()
func (cdb *clusterDAGBuilder) handleBlock(n *api.NodeWithMeta, more bool) bool {
if !more {
err := cdb.finalize()
if err != nil {
logger.Error(err)
cdb.error = err
}
return false
}
err := cdb.ingestBlock(n)
if err != nil {
logger.Error(err)
cdb.error = err
return false
}
return true
}

func (cdb *clusterDAGBuilder) ingestBlocks() {
// if this function returns, it means we are Done().
// we auto-cancel ourselves in that case.
// if it was due to an error, it will be in Err().
defer cdb.Cancel()

for {
cont := true

for cont {
select {
case <-cdb.ctx.Done(): // cancelled from outside
return
case n, ok := <-cdb.blocks:
if !ok {
err := cdb.finalize()
if err != nil {
logger.Error(err)
cdb.error = err
}
return // will cancel on defer
}
err := cdb.ingestBlock(n)
if err != nil {
logger.Error(err)
cdb.error = err
return // will cancel on defer
}
// continue with next block
cont = cdb.handleBlock(n, ok)
}
}
}
@@ -65,6 +65,17 @@ func CborDataToNode(raw []byte, format string) (ipld.Node, error) {
return shardNode, nil
}

func makeDAGSimple(dagObj map[string]*cid.Cid) (ipld.Node, error) {
node, err := cbor.WrapObject(
dagObj,
hashFn, mh.DefaultLengths[hashFn],
)
if err != nil {
return nil, err
}
return node, err
}

// makeDAG parses a shardObj which stores all of the node-links a shardDAG
// is responsible for tracking. In general a single node of links may exceed
// the capacity of an ipfs block. In this case an indirect node in the
@@ -75,14 +86,8 @@ func CborDataToNode(raw []byte, format string) (ipld.Node, error) {
func makeDAG(dagObj map[string]*cid.Cid) ([]ipld.Node, error) {
// No indirect node
if len(dagObj) <= MaxLinks {
node, err := cbor.WrapObject(
dagObj,
hashFn, mh.DefaultLengths[hashFn],
)
if err != nil {
return nil, err
}
return []ipld.Node{node}, err
n, err := makeDAGSimple(dagObj)
return []ipld.Node{n}, err
}
// Indirect node required
leafNodes := make([]ipld.Node, 0) // shardNodes with links to data
@@ -100,16 +105,14 @@ func makeDAG(dagObj map[string]*cid.Cid) ([]ipld.Node, error) {
}
leafObj[fmt.Sprintf("%d", j)] = c
}
leafNode, err := cbor.WrapObject(leafObj, hashFn,
mh.DefaultLengths[hashFn])
leafNode, err := makeDAGSimple(leafObj)
if err != nil {
return nil, err
}
indirectObj[fmt.Sprintf("%d", i)] = leafNode.Cid()
leafNodes = append(leafNodes, leafNode)
}
indirectNode, err := cbor.WrapObject(indirectObj, hashFn,
mh.DefaultLengths[hashFn])
indirectNode, err := makeDAGSimple(indirectObj)
if err != nil {
return nil, err
}
@@ -22,7 +22,8 @@ type MockBlockStore interface {
}

// VerifyShards checks that a sharded CID has been correctly formed and stored.
// This is a helper function for testing.
// This is a helper function for testing. It returns a map with all the blocks
// from all shards.
func VerifyShards(t *testing.T, rootCid *cid.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) {
metaPin, err := pins.PinGet(rootCid)
if err != nil {
@@ -40,7 +40,7 @@ func (c *Client) doRequest(method, path string, body io.Reader) (*http.Response,
// eventually we may want to trigger streaming with a boolean flag in
// a single doRequest function to prevent code duplication (same for do)
func (c *Client) doStreamRequest(method, path string, body io.Reader, headers map[string]string) (*http.Response, error) {
urlpath := c.net + "://" + c.hostname + "/" + strings.TrimPrefix(path, "/")
urlpath := c.net + "://" + c.hostname + "/" + strings.TrimPrefix(path, "/")
logger.Debugf("%s: %s", method, urlpath)

r, err := http.NewRequest(method, urlpath, body)
@@ -65,7 +65,7 @@ func (c *Client) doStreamRequest(method, path string, body io.Reader, headers ma
return c.client.Do(r)
}

func (c *Client) doStream(method, path string, body io.Reader, headers map[string]string, obj interface{}) (error) {
func (c *Client) doStream(method, path string, body io.Reader, headers map[string]string, obj interface{}) error {
resp, err := c.doStreamRequest(method, path, body, headers)
if err != nil {
return &api.Error{Code: 0, Message: err.Error()}

0 comments on commit aa5589d

Please sign in to comment.
You can’t perform that action at this time.