Skip to content

Commit

Permalink
Merge pull request #55 from filecoin-project/reformatting_logs_for_mu…
Browse files Browse the repository at this point in the history
…lti_threading

Reformatting logs for multi threading
  • Loading branch information
binocarlos committed Mar 16, 2022
2 parents 17bc47d + 77beeeb commit a60f080
Show file tree
Hide file tree
Showing 36 changed files with 7,115 additions and 222 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Expand Up @@ -76,4 +76,4 @@ jobs:
run: wget https://dist.ipfs.io/go-ipfs/v0.11.0/go-ipfs_v0.11.0_linux-amd64.tar.gz && tar -xvzf go-ipfs_v0.11.0_linux-amd64.tar.gz && cd go-ipfs && bash install.sh

- name: Test
run: make test
run: make test
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -21,4 +21,5 @@ bin/*
.venv
temperature_sensor_data.csv
temperature_sensor_data.csv.bz2
.ipynb_checkpoints
.ipynb_checkpoints
__debug_bin
8 changes: 8 additions & 0 deletions Makefile
Expand Up @@ -132,6 +132,14 @@ test:
test-devstack:
go test -v -count 1 -timeout 300s -run ^TestDevStack$$ github.com/filecoin-project/bacalhau/cmd/bacalhau

.PHONY: test-commands
test-commands:
go test -v -count 1 -timeout 3000s -run ^TestCommands$$ github.com/filecoin-project/bacalhau/cmd/bacalhau

.PHONY: test-badactors
test-badactors:
go test -v -count 1 -timeout 3000s -run ^TestCatchBadActors$$ github.com/filecoin-project/bacalhau/cmd/bacalhau

################################################################################
# Target: lint #
################################################################################
Expand Down
24 changes: 13 additions & 11 deletions cmd/bacalhau/devstack.go
Expand Up @@ -8,11 +8,12 @@ import (
"strings"
"time"

"github.com/rs/zerolog/log"

"github.com/filecoin-project/bacalhau/internal"
"github.com/filecoin-project/bacalhau/internal/ipfs"

"github.com/spf13/cobra"
"go.uber.org/zap"
)

var devStackBadActors int
Expand All @@ -29,18 +30,16 @@ var devstackCmd = &cobra.Command{
Short: "Start a cluster of 3 bacalhau nodes for testing and development",
RunE: func(cmd *cobra.Command, args []string) error {

log, _ := zap.NewProduction()

result, err := ipfs.IpfsCommand("", []string{"version"})

if err != nil {
log.Error(fmt.Sprintf("Error running command 'ipfs version': %s", err))
log.Error().Msg(fmt.Sprintf("Error running command 'ipfs version': %s", err))
return err
}

if strings.Contains(result, "0.12.0") {
err = fmt.Errorf("\n********************\nDue to a regression, we do not support 0.12.0. Please install from here:\nhttps://ipfs.io/ipns/dist.ipfs.io/go-ipfs/v0.11.0/go-ipfs_v0.11.0_linux-amd64.tar.gz\n********************\n")
log.Error(err.Error())
log.Error().Err(err)
return err
}

Expand All @@ -65,17 +64,20 @@ var devstackCmd = &cobra.Command{
}
}()

for i, node := range stack.Nodes {
fmt.Printf("\nnode %d:\n", i)
fmt.Printf("IPFS_PATH=%s ipfs\n", node.IpfsRepo)
fmt.Printf("go run . --jsonrpc-port=%d list\n", node.JsonRpcPort)
for nodeNumber, node := range stack.Nodes {
log.Info().Msg(fmt.Sprintf(`
Node %d:
IPFS_PATH=%s
JSON_PORT=%d
bin/bacalhau --jsonrpc-port=%d list
`, nodeNumber, node.IpfsRepo, node.JsonRpcPort, node.JsonRpcPort))
}

fmt.Printf(`
log.Info().Msg(fmt.Sprintf(`
To add a file, type the following:
file_path="your_file_path_here"
cid=$( IPFS_PATH=%s ipfs add -q $file_path )
`, stack.Nodes[0].IpfsRepo)
`, stack.Nodes[0].IpfsRepo))

// wait forever because everything else is running in a goroutine
select {}
Expand Down
240 changes: 230 additions & 10 deletions cmd/bacalhau/devstack_test.go
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"reflect"
Expand All @@ -13,31 +12,37 @@ import (
"time"

"github.com/filecoin-project/bacalhau/internal"
_ "github.com/filecoin-project/bacalhau/internal/logger"

"github.com/filecoin-project/bacalhau/internal/ipfs"
"github.com/filecoin-project/bacalhau/internal/system"
"github.com/filecoin-project/bacalhau/internal/traces"
"github.com/filecoin-project/bacalhau/internal/types"
"github.com/stretchr/testify/assert"

"github.com/rs/zerolog/log"
)

// run the job on 2 nodes
const TEST_CONCURRENCY = 2
const TEST_CONCURRENCY = 1

// both nodes must agree on the result
const TEST_CONFIDENCE = 2
const TEST_CONFIDENCE = 1

// the results must be within 10% of each other
const TEST_TOLERANCE = 0.1

func setupTest(t *testing.T) (*internal.DevStack, context.CancelFunc) {
func setupTest(t *testing.T, nodes int, badActors int) (*internal.DevStack, context.CancelFunc) {
ctx := context.Background()
ctxWithCancel, cancelFunction := context.WithCancel(ctx)

os.Setenv("DEBUG", "true")
os.Setenv("LOG_LEVEL", "debug")
os.Setenv("BACALHAU_RUNTIME", "docker")

stack, err := internal.NewDevStack(ctxWithCancel, 3, 0)
stack, err := internal.NewDevStack(ctxWithCancel, nodes, badActors)
assert.NoError(t, err)
if err != nil {
log.Fatalf("Unable to create devstack: %s", err)
log.Fatal().Msg(fmt.Sprintf("Unable to create devstack: %s", err))
}

// we need a better method for this - i.e. waiting for all the ipfs nodes to be ready
Expand All @@ -53,7 +58,7 @@ func teardownTest(stack *internal.DevStack, cancelFunction context.CancelFunc) {
}

func TestDevStack(t *testing.T) {
stack, cancelFunction := setupTest(t)
stack, cancelFunction := setupTest(t, 1, 0)
defer teardownTest(stack, cancelFunction)

c := make(chan os.Signal, 1)
Expand Down Expand Up @@ -131,7 +136,8 @@ raspberry

assert.NoError(t, err)

system.TryUntilSucceedsN(func() error { // nolint
// TODO: Do something with the error
err = system.TryUntilSucceedsN(func() error {
result, err := ListJobs("127.0.0.1", stack.Nodes[0].JsonRpcPort)
if err != nil {
return err
Expand All @@ -154,7 +160,7 @@ raspberry
jobStates = append(jobStates, state.State)
}

if !reflect.DeepEqual(jobStates, []string{"complete", "complete"}) {
if !reflect.DeepEqual(jobStates, []string{"complete"}) {
return fmt.Errorf("expected job to be complete, got %+v", jobStates)
}

Expand All @@ -172,3 +178,217 @@ raspberry

assert.True(t, strings.Contains(string(stdoutText), "kiwi is delicious"))
}

func TestCommands(t *testing.T) {
tests := map[string]struct {
file string
cmd string
contains string
expected_line_count int
}{
"grep": {file: "../../testdata/grep_file.txt", cmd: `timeout 2000 grep kiwi /ipfs/%s || echo "ipfs read timed out"`, contains: "kiwi is delicious", expected_line_count: 4},
// "sed": {file: "../../testdata/sed_file.txt", cmd: "sed -n '/38.7[2-4]..,-9.1[3-7]../p' /ipfs/%s", contains: "LISBON", expected_line_count: 7},
// "awk": {file: "../../testdata/awk_file.txt", cmd: "awk -F',' '{x=38.7077507-$3; y=-9.1365919-$4; if(x^2+y^2<0.3^2) print}' /ipfs/%s", contains: "LISBON", expected_line_count: 7},
}

_ = system.RunCommand("sudo", []string{"pkill", "ipfs"})

stack, cancelFunction := setupTest(t, 3, 0)
defer teardownTest(stack, cancelFunction)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for range c {
teardownTest(stack, cancelFunction)
os.Exit(1)
}
}()

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
log.Warn().Msgf(`
========================================
Starting new job:
name: %s
cmd: %s
file: %s
========================================
`, name, tc.cmd, tc.file)

// t.Parallel()

cid, err := add_file_to_nodes(t, stack, tc.file)

assert.NoError(t, err)

job, hostId, err := execute_command(t, stack, tc.cmd, cid, TEST_CONCURRENCY, TEST_CONFIDENCE, TEST_TOLERANCE)
assert.NoError(t, err)

resultsDirectory, err := system.GetSystemDirectory(system.GetResultsDirectory(job.Id, hostId))
assert.NoError(t, err)

stdoutText, err := ioutil.ReadFile(fmt.Sprintf("%s/stdout.log", resultsDirectory))
assert.NoError(t, err)

assert.True(t, strings.Contains(string(stdoutText), tc.contains))
actual_line_count := len(strings.Split(string(stdoutText), "\n"))
assert.Equal(t, actual_line_count, tc.expected_line_count, fmt.Sprintf("Count mismatch:\nExpected: %d\nActual: %d", tc.expected_line_count, actual_line_count))

})
}
}

func add_file_to_nodes(t *testing.T, stack *internal.DevStack, filename string) (string, error) {

fileCid := ""
var err error

// ipfs add the file to 2 nodes
// this tests self selection
for i, node := range stack.Nodes {
if i >= TEST_CONCURRENCY {
continue
}

fileCid, err = ipfs.IpfsCommand(node.IpfsRepo, []string{
"add", "-Q", filename,
})
if err != nil {
log.Debug().Msgf(`Error running ipfs add -Q: %s`, err)
return "", err
}
}

fileCid = strings.TrimSpace(fileCid)

return fileCid, nil
}

func execute_command(
t *testing.T,
stack *internal.DevStack,
cmd string,
fileCid string,
concurrency int,
confidence int,
tolerance float64,
) (*types.Job, string, error) {
var job *types.Job
var err error

err = system.TryUntilSucceedsN(func() error {

log.Debug().Msg(fmt.Sprintf(`About to submit job:
cmd: %s`, fmt.Sprintf(cmd, fileCid)))

job, err = SubmitJob(
[]string{
fmt.Sprintf(cmd, fileCid),
},
[]string{
fileCid,
},
concurrency,
confidence,
tolerance,
"127.0.0.1",
stack.Nodes[0].JsonRpcPort,
)
return err
}, "submit job", 100)

assert.NoError(t, err)

// TODO: Do something with the error
err = system.TryUntilSucceedsN(func() error {
result, err := ListJobs("127.0.0.1", stack.Nodes[0].JsonRpcPort)
if err != nil {
return err
}

var jobData *types.Job

// TODO: Super fragile if executed in parallel.
for _, j := range result.Jobs {
jobData = j
}

actualJobStates := []string{}
requiredJobStates := []string{}

for i := 0; i < concurrency; i++ {
requiredJobStates = append(requiredJobStates, "complete")
}

for _, state := range jobData.State {
actualJobStates = append(actualJobStates, state.State)
}

log.Debug().Msgf("Compare job states:\n%+v\nVS.\n%+v\n", actualJobStates, requiredJobStates)

if !reflect.DeepEqual(actualJobStates, requiredJobStates) {
return fmt.Errorf("Expected job states to be %+v, got %+v", requiredJobStates, actualJobStates)
}

return nil
}, "wait for results to be", 100)

hostId, err := stack.Nodes[0].ComputeNode.Scheduler.HostId()
assert.NoError(t, err)

return job, hostId, nil
}

func TestCatchBadActors(t *testing.T) {

t.Skip()

tests := map[string]struct {
nodes int
concurrency int
confidence int
tolerance float64
badActors int
expectation bool
}{
"two_agree": {nodes: 3, concurrency: 3, confidence: 2, tolerance: 0.1, badActors: 0, expectation: true},
// "one_bad_actor": {nodes: 3, concurrency: 2, confidence: 2, tolerance: 0.1, badActors: 1, expectation: false},
}

// TODO: #57 This is stupid (for now) but need to add the %s at the end because we don't have an elegant way to run without a cid (yet). Will fix later.
commands := []string{
`python3 -c "import time; x = '0'*1024*1024*100; time.sleep(10); %s"`,
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// t.Parallel()

stack, cancelFunction := setupTest(t, tc.nodes, tc.badActors)
defer teardownTest(stack, cancelFunction)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for range c {
teardownTest(stack, cancelFunction)
os.Exit(1)
}
}()

job, _, err := execute_command(t, stack, commands[0], "", tc.concurrency, tc.confidence, tc.tolerance)
assert.NoError(t, err, "Error executing command: %+v", err)

resultsList, err := system.ProcessJobIntoResults(job)
assert.NoError(t, err, "Error processing job into results: %+v", err)

correctGroup, incorrectGroup, err := traces.ProcessResults(job, resultsList)

assert.Equal(t, (len(correctGroup)-len(incorrectGroup)) == tc.nodes, fmt.Sprintf("Expected %d good actors, got %d", tc.nodes, len(correctGroup)))
assert.Equal(t, (len(incorrectGroup)) == tc.badActors, fmt.Sprintf("Expected %d bad actors, got %d", tc.badActors, len(incorrectGroup)))
assert.NoError(t, err, "Expected to run with no error. Actual: %+v", err)

})
}
}

0 comments on commit a60f080

Please sign in to comment.