Skip to content

Commit

Permalink
Rollback logrus for now, as causes crash on MacOS sirupsen/logrus#1275
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Aug 30, 2021
1 parent b79eecc commit 59d7fcd
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 87 deletions.
4 changes: 2 additions & 2 deletions bin/README.hermit.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

This is a [Hermit](https://github.com/cashapp/hermit) bin directory.

The symlinks in this directory are managed by Hermit and will automatically download and install
Hermit itself as well as packages. These packages are local to this environment.
The symlinks in this directory are managed by Hermit and will automatically download and install Hermit itself as well
as packages. These packages are local to this environment.
9 changes: 5 additions & 4 deletions cluster/dragon/dragon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"context"
"fmt"
"github.com/lni/dragonboat/v3/client"
log "github.com/sirupsen/logrus"
"log"

"github.com/squareup/pranadb/conf"
"path/filepath"
"sync"
Expand Down Expand Up @@ -73,7 +74,7 @@ type snapshot struct {

func (s *snapshot) Close() {
if err := s.pebbleSnapshot.Close(); err != nil {
log.Errorf("failed to close snapshot %v", err)
log.Printf("failed to close snapshot %v", err)
}
}

Expand Down Expand Up @@ -278,7 +279,7 @@ func (d *Dragon) Start() error {
// https://github.com/squareup/pranadb/issues/124
time.Sleep(10 * time.Second)

log.Infof("Dragon node %d started", d.cnf.NodeID)
log.Printf("Dragon node %d started", d.cnf.NodeID)

return nil
}
Expand Down Expand Up @@ -712,7 +713,7 @@ func (d *Dragon) NodeUnloaded(info raftio.NodeInfo) {
go func() {
err := d.nodeRemovedFromCluster(int(info.NodeID-1), info.ClusterID)
if err != nil {
log.Errorf("failed to remove node from cluster %v", err)
log.Printf("failed to remove node from cluster %v", err)
}
}()
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/dragon/integration/dragon_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"flag"
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/conf"

dragon "github.com/squareup/pranadb/cluster/dragon"
Expand All @@ -31,7 +31,7 @@ var dataDir string
func TestMain(m *testing.M) {
flag.Parse()
if testing.Short() {
log.Infof("-short: skipped")
log.Printf("-short: skipped")
return
}
var err error
Expand Down
8 changes: 4 additions & 4 deletions cmd/pranadb/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package main

import (
"os"

"github.com/alecthomas/kong"
"github.com/alecthomas/kong-hcl/v2"
log "github.com/sirupsen/logrus"
"log"
"os"

"github.com/squareup/pranadb/conf"
plog "github.com/squareup/pranadb/log"
"github.com/squareup/pranadb/server"
Expand All @@ -20,7 +20,7 @@ type cli struct {
func main() {
r := &runner{}
if err := r.run(os.Args[1:], true); err != nil {
log.WithError(err).Fatal("startup failed")
log.Printf("startup failed %v", err)
}
select {} // prevent main exiting
}
Expand Down
40 changes: 20 additions & 20 deletions cmd/pranadb/testdata/config.hcl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// This is the clusterid
cluster-id = 12345
cluster-id = 12345
/*
These are the raft addresses
*/
Expand All @@ -16,34 +16,34 @@ notif-listen-addresses = [
]

// Numshards
num-shards = 50
replication-factor = 3
data-dir = "foo/bar/baz"
test-server = false
data-snapshot-entries = 1001
data-compaction-overhead = 501
sequence-snapshot-entries = 2001
sequence-compaction-overhead = 1001
locks-snapshot-entries = 101
locks-compaction-overhead = 51
debug = true
notifier-heartbeat-interval = "76s"
enable-api-server = true
api-server-listen-addresses = [
num-shards = 50
replication-factor = 3
data-dir = "foo/bar/baz"
test-server = false
data-snapshot-entries = 1001
data-compaction-overhead = 501
sequence-snapshot-entries = 2001
sequence-compaction-overhead = 1001
locks-snapshot-entries = 101
locks-compaction-overhead = 51
debug = true
notifier-heartbeat-interval = "76s"
enable-api-server = true
api-server-listen-addresses = [
"addr7",
"addr8",
"addr9"
]
api-server-session-timeout = "41s"
api-server-session-timeout = "41s"
api-server-session-check-interval = "6s"
log-format = "json"
log-level = "info"
log-file = "-"
log-format = "json"
log-level = "info"
log-file = "-"

kafka-brokers = {
"testbroker" = {
"client-type" = 1,
"properties" = {
"properties" = {
"fakeKafkaID" = "1"
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package common

import (
"fmt"
log "github.com/sirupsen/logrus"
"io"
"log"
"reflect"
"runtime"
"sync/atomic"
Expand Down Expand Up @@ -58,7 +58,7 @@ func InvokeCloser(closer io.Closer) {
if closer != nil {
err := closer.Close()
if err != nil {
log.Errorf("failed to close closer %v", err)
log.Printf("failed to close closer %v", err)
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions kafka/fake_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package kafka
import (
"fmt"
"github.com/pkg/errors"

log "github.com/sirupsen/logrus"
"log"

"github.com/squareup/pranadb/common"

Expand Down Expand Up @@ -274,7 +273,7 @@ func (g *Group) checkInjectFailure() error {
if g.failureEnd != nil {

if time.Now().Sub(*g.failureEnd) >= 0 {
log.Infof("Failure injection has ended")
log.Printf("Failure injection has ended")
g.failureEnd = nil
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion kafka/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func IngestRows(f *FakeKafka, sourceInfo *common.SourceInfo, rows *common.Rows,
ok, err := commontest.WaitUntilWithError(func() (bool, error) {
ingested, committed := topic.TotalMessages(groupID)
// All the messages have been ingested and committed
//log.Infof("start committed %d ingested %d committed %d ingested %d", c, ingestedStart, committed, ingested)
//log.Printf("start committed %d ingested %d committed %d ingested %d", c, ingestedStart, committed, ingested)
if (ingested-ingestedStart == rows.RowCount()) && (ingested-committed) == 0 {
return true, nil
}
Expand Down
3 changes: 1 addition & 2 deletions log/log.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package log

import (
"os"

log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/perrors"
"os"
)

// Config contains the configuration for the global logger.
Expand Down
12 changes: 6 additions & 6 deletions notifier/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package notifier
import (
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/common"
"log"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -63,7 +63,7 @@ func (c *client) makeUnavailable(serverAddress string) {
// Cannot write to server or make connection, it's unavailable - it may be down or there's a network issue
// We remove the server from the set of live servers and add it to the set of unavailable ones
// Unavailable ones will be retried after a delay
log.Errorf("Server became unavailable %s", serverAddress)
log.Printf("Server became unavailable %s", serverAddress)
delete(c.connections, serverAddress)
delete(c.availableServers, serverAddress)
c.unavailableServers[serverAddress] = time.Now()
Expand Down Expand Up @@ -107,7 +107,7 @@ func (c *client) broadcast(nf *NotificationMessage, ri *responseInfo) error {
for serverAddress, failTime := range c.unavailableServers {
if now.Sub(failTime) >= connectionRetryBackoff {
// Put the server back in the available set
log.Warnf("Backoff time for unavailable server %s has expired - adding back to available set", serverAddress)
log.Printf("Backoff time for unavailable server %s has expired - adding back to available set", serverAddress)
delete(c.unavailableServers, serverAddress)
c.availableServers[serverAddress] = struct{}{}
}
Expand Down Expand Up @@ -321,15 +321,15 @@ func (cc *clientConnection) stop() {
}
if err := cc.conn.Close(); err != nil {
// Do nothing - connection might already have been closed (e.g. from client)
log.Errorf("Failed to close connection %v", err)
log.Printf("Failed to close connection %v", err)
}
<-cc.loopCh
cc.client.connectionClosed(cc)
}

func (cc *clientConnection) sendHeartbeat() {
if err := writeMessage(heartbeatMessageType, nil, cc.conn); err != nil {
log.Errorf("failed to send heartbeat %v", err)
log.Printf("failed to send heartbeat %v", err)
cc.heartbeatFailed()
return
}
Expand All @@ -338,7 +338,7 @@ func (cc *clientConnection) sendHeartbeat() {
if cc.hbReceived.Get() {
cc.sendHeartbeat()
} else {
log.Warnf("response heartbeat not received within %f seconds", cc.client.heartbeatInterval.Seconds())
log.Printf("response heartbeat not received within %f seconds", cc.client.heartbeatInterval.Seconds())
cc.heartbeatFailed()
}
})
Expand Down
11 changes: 6 additions & 5 deletions notifier/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package notifier

import (
"fmt"
"log"
"net"
"sync"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/squareup/pranadb/common"
)

Expand Down Expand Up @@ -175,7 +176,7 @@ func (c *connection) handleMessage(msgType messageType, msg []byte) {
if msgType == heartbeatMessageType {
if !c.s.responsesDisabled.Get() {
if err := writeMessage(heartbeatMessageType, nil, c.conn); err != nil {
log.Errorf("failed to write heartbeat %v", err)
log.Printf("failed to write heartbeat %v", err)
}
}
return
Expand All @@ -192,19 +193,19 @@ func (c *connection) handleMessageAsync(msg []byte) {
func (c *connection) doHandleMessageAsync(msg []byte) {
nf := &NotificationMessage{}
if err := nf.deserialize(msg); err != nil {
log.Errorf("Failed to deserialize notification %v", err)
log.Printf("Failed to deserialize notification %v", err)
return
}
listener := c.s.lookupNotificationListener(nf.notif)
err := listener.HandleNotification(nf.notif)
ok := true
if err != nil {
log.Errorf("Failed to handle notification %v", err)
log.Printf("Failed to handle notification %v", err)
ok = false
}
if nf.requiresResponse && !c.s.responsesDisabled.Get() {
if err := c.sendResponse(nf, ok); err != nil {
log.Errorf("failed to send response %v", err)
log.Printf("failed to send response %v", err)
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions push/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package push
import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"log"

"github.com/squareup/pranadb/conf"
"github.com/squareup/pranadb/push/mover"
"github.com/squareup/pranadb/push/sched"
Expand Down Expand Up @@ -225,10 +226,10 @@ func (s *shardListener) maybeHandleRemoteBatch() error {
// It's possible an error can occur in handling received rows if the source or aggregate table is not
// yet registered - this could be the case if rows are forwarded right after startup - in this case we can just
// retry
log.Errorf("failed to handle received rows %v will retry after delay", err)
log.Printf("failed to handle received rows %v will retry after delay", err)
time.AfterFunc(remoteBatchRetryDelay, func() {
if err := s.maybeHandleRemoteBatch(); err != nil {
log.Errorf("failed to process remote batch %v", err)
log.Printf("failed to process remote batch %v", err)
}
})
return nil
Expand Down Expand Up @@ -322,20 +323,20 @@ func (p *PushEngine) checkForRowsToForward() error {
// WaitForProcessingToComplete is used in tests to wait for all rows have been processed when ingesting test data
func (p *PushEngine) WaitForProcessingToComplete() error {

log.Infof("Waiting for schedulers to stop")
log.Printf("Waiting for schedulers to stop")
err := p.waitForSchedulers()
if err != nil {
return err
}

log.Infof("Waiting for no rows in forwarder table")
log.Printf("Waiting for no rows in forwarder table")
// Wait for no rows in the forwarder table
err = p.waitForNoRowsInTable(common.ForwarderTableID)
if err != nil {
return err
}

log.Infof("Waiting for no rows in receiver table")
log.Printf("Waiting for no rows in receiver table")
// Wait for no rows in the receiver table
err = p.waitForNoRowsInTable(common.ReceiverTableID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions push/sched/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sched

import (
log "github.com/sirupsen/logrus"
"log"
"sync"
)

Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *ShardScheduler) runLoop() {
if holder.errChan != nil {
holder.errChan <- err
} else if err != nil {
log.Errorf("Failed to execute action: %v", err)
log.Printf("Failed to execute action: %v", err)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions push/source/consumer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package source

import (
log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/common"
"github.com/squareup/pranadb/kafka"
"github.com/squareup/pranadb/push/sched"
"log"
"time"
)

Expand Down Expand Up @@ -147,7 +147,7 @@ func (m *MessageConsumer) getBatch(pollTimeout time.Duration, maxRecords int) ([
// We've seen the message before - this can be the case if a node crashed after offset was committed in
// Prana but before offset was committed in Kafka.
// In this case we log a warning, and ignore the message, the offset will be committed
log.Warnf("Duplicate message delivery attempted on node %d schema %s source %s topic %s partition %d offset %d"+
log.Printf("Duplicate message delivery attempted on node %d schema %s source %s topic %s partition %d offset %d"+
" Message will be ignored", m.source.cluster.GetNodeID(), m.source.sourceInfo.SchemaName, m.source.sourceInfo.Name, m.source.sourceInfo.TopicInfo.TopicName, partID, msg.PartInfo.Offset)
continue
}
Expand Down
Loading

0 comments on commit 59d7fcd

Please sign in to comment.