Skip to content

Commit

Permalink
feat: Write logged features to Offline Store (Go - Python integration) (
Browse files Browse the repository at this point in the history
#2621)

* write logged features by path

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix dummy sink

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* share snowflake cursor and stage name

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* share snowflake cursor and stage name

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* e2e tests WIP

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* graceful stop (grpc & logging)

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* revert accidental change

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* add comments to go embedded methods

Signed-off-by: Oleksii Moskalenko <oleksii@oleksiis-mbp.lan>

Co-authored-by: Oleksii Moskalenko <oleksii@oleksiis-mbp.lan>
  • Loading branch information
pyalex and Oleksii Moskalenko committed May 3, 2022
1 parent cff0133 commit ccad832
Show file tree
Hide file tree
Showing 24 changed files with 612 additions and 114 deletions.
63 changes: 54 additions & 9 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@ import (
"os"
"os/signal"
"syscall"

"google.golang.org/grpc"

"github.com/feast-dev/feast/go/internal/feast/server"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
"time"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/arrow/cdata"
"github.com/apache/arrow/go/v8/arrow/memory"
"google.golang.org/grpc"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/internal/feast/server"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/internal/feast/transformation"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
)
Expand All @@ -44,6 +43,15 @@ type DataTable struct {
SchemaPtr uintptr
}

// LoggingOptions is a public (embedded) copy of logging.LoggingOptions struct.
// See logging.LoggingOptions for properties description
type LoggingOptions struct {
ChannelCapacity int
EmitTimeout time.Duration
WriteInterval time.Duration
FlushInterval time.Duration
}

func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService {
repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig)
if err != nil {
Expand Down Expand Up @@ -214,17 +222,50 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
return nil
}

// StartGprcServer starts gRPC server with disabled feature logging and blocks the thread
func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
// TODO(oleksii): enable logging
// Disable logging for now
return s.StartGprcServerWithLogging(host, port, nil, LoggingOptions{})
}

// StartGprcServerWithLoggingDefaultOpts starts gRPC server with enabled feature logging but default configuration for logging
// Caller of this function must provide Python callback to flush buffered logs
func (s *OnlineFeatureService) StartGprcServerWithLoggingDefaultOpts(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error {
defaultOpts := LoggingOptions{
ChannelCapacity: logging.DefaultOptions.ChannelCapacity,
EmitTimeout: logging.DefaultOptions.EmitTimeout,
WriteInterval: logging.DefaultOptions.WriteInterval,
FlushInterval: logging.DefaultOptions.FlushInterval,
}
return s.StartGprcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts)
}

// StartGprcServerWithLogging starts gRPC server with enabled feature logging
// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
var loggingService *logging.LoggingService = nil
var err error
if writeLoggedFeaturesCallback != nil {
sink, err := logging.NewOfflineStoreSink(writeLoggedFeaturesCallback)
if err != nil {
return err
}

loggingService, err = logging.NewLoggingService(s.fs, sink, logging.LoggingOptions{
ChannelCapacity: loggingOpts.ChannelCapacity,
EmitTimeout: loggingOpts.EmitTimeout,
WriteInterval: loggingOpts.WriteInterval,
FlushInterval: loggingOpts.FlushInterval,
})
if err != nil {
return err
}
}
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return err
}
log.Printf("Listening a gRPC server on host %s port %d\n", host, port)

grpcServer := grpc.NewServer()
serving.RegisterServingServiceServer(grpcServer, ser)
Expand All @@ -234,6 +275,10 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
<-s.grpcStopCh
fmt.Println("Stopping the gRPC server...")
grpcServer.GracefulStop()
if loggingService != nil {
loggingService.Stop()
}
fmt.Println("gRPC server terminated")
}()

err = grpcServer.Serve(lis)
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/server/logging/filelogsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *FileLogSink) Write(record arrow.Record) error {
return pqarrow.WriteTable(table, writer, 100, props, arrProps)
}

func (s *FileLogSink) Flush() error {
func (s *FileLogSink) Flush(featureServiceName string) error {
// files are already flushed during Write
return nil
}
26 changes: 19 additions & 7 deletions go/internal/feast/server/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"math/rand"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -37,7 +38,7 @@ type LogSink interface {
// Flush actually send data to a sink.
// We want to control amount to interaction with sink, since it could be a costly operation.
// Also, some sinks like BigQuery might have quotes and physically limit amount of write requests per day.
Flush() error
Flush(featureServiceName string) error
}

type Logger interface {
Expand Down Expand Up @@ -135,6 +136,10 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
lErr = errors.WithStack(rErr)
}
}()

writeTicker := time.NewTicker(l.config.WriteInterval)
flushTicker := time.NewTicker(l.config.FlushInterval)

for {
shouldStop := false

Expand All @@ -144,18 +149,18 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
if err != nil {
log.Printf("Log write failed: %+v", err)
}
err = l.sink.Flush()
err = l.sink.Flush(l.featureServiceName)
if err != nil {
log.Printf("Log flush failed: %+v", err)
}
shouldStop = true
case <-time.After(l.config.WriteInterval):
case <-writeTicker.C:
err := l.buffer.writeBatch(l.sink)
if err != nil {
log.Printf("Log write failed: %+v", err)
}
case <-time.After(l.config.FlushInterval):
err := l.sink.Flush()
case <-flushTicker.C:
err := l.sink.Flush(l.featureServiceName)
if err != nil {
log.Printf("Log flush failed: %+v", err)
}
Expand All @@ -171,6 +176,9 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
}
}

writeTicker.Stop()
flushTicker.Stop()

// Notify all waiters for graceful stop
l.cond.L.Lock()
l.isStopped = true
Expand Down Expand Up @@ -225,7 +233,11 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur
for idx, featureName := range l.schema.Features {
featureIdx, ok := featureNameToVectorIdx[featureName]
if !ok {
return errors.Errorf("Missing feature %s in log data", featureName)
featureNameParts := strings.Split(featureName, "__")
featureIdx, ok = featureNameToVectorIdx[featureNameParts[1]]
if !ok {
return errors.Errorf("Missing feature %s in log data", featureName)
}
}
featureValues[idx] = featureVectors[featureIdx].Values[rowIdx]
featureStatuses[idx] = featureVectors[featureIdx].Statuses[rowIdx]
Expand Down Expand Up @@ -259,7 +271,7 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur
EventTimestamps: eventTimestamps,

RequestId: requestId,
LogTimestamp: time.Now(),
LogTimestamp: time.Now().UTC(),
}
err := l.EmitLog(&newLog)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/server/logging/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *DummySink) Write(rec arrow.Record) error {
return nil
}

func (s *DummySink) Flush() error {
func (s *DummySink) Flush(featureServiceName string) error {
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions go/internal/feast/server/logging/memorybuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package logging

import (
"fmt"
"time"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
Expand Down Expand Up @@ -143,7 +142,7 @@ func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) {
}

logTimestamp := arrow.Timestamp(logRow.LogTimestamp.UnixMicro())
logDate := arrow.Date32(logRow.LogTimestamp.Truncate(24 * time.Hour).Unix())
logDate := arrow.Date32FromTime(logRow.LogTimestamp)

builder.Field(fieldNameToIdx[LOG_TIMESTAMP_FIELD]).(*array.TimestampBuilder).UnsafeAppend(logTimestamp)
builder.Field(fieldNameToIdx[LOG_DATE_FIELD]).(*array.Date32Builder).UnsafeAppend(logDate)
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/server/logging/memorybuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestSerializeToArrowTable(t *testing.T) {
// log date
today := time.Now().Truncate(24 * time.Hour)
builder.Field(8).(*array.Date32Builder).AppendValues(
[]arrow.Date32{arrow.Date32(today.Unix()), arrow.Date32(today.Unix())}, []bool{true, true})
[]arrow.Date32{arrow.Date32FromTime(today), arrow.Date32FromTime(today)}, []bool{true, true})

// request id
builder.Field(9).(*array.StringBuilder).AppendValues(
Expand Down
83 changes: 83 additions & 0 deletions go/internal/feast/server/logging/offlinestoresink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package logging

import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/parquet"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/google/uuid"
)

type OfflineStoreWriteCallback func(featureServiceName, datasetDir string) string

type OfflineStoreSink struct {
datasetDir string
writeCallback OfflineStoreWriteCallback
}

func NewOfflineStoreSink(writeCallback OfflineStoreWriteCallback) (*OfflineStoreSink, error) {
return &OfflineStoreSink{
datasetDir: "",
writeCallback: writeCallback,
}, nil
}

func (s *OfflineStoreSink) getOrCreateDatasetDir() (string, error) {
if s.datasetDir != "" {
return s.datasetDir, nil
}
dir, err := ioutil.TempDir("", "*")
if err != nil {
return "", err
}
s.datasetDir = dir
return s.datasetDir, nil
}

func (s *OfflineStoreSink) cleanCurrentDatasetDir() error {
if s.datasetDir == "" {
return nil
}
datasetDir := s.datasetDir
s.datasetDir = ""
return os.RemoveAll(datasetDir)
}

func (s *OfflineStoreSink) Write(record arrow.Record) error {
fileName, _ := uuid.NewUUID()
datasetDir, err := s.getOrCreateDatasetDir()
if err != nil {
return err
}

var writer io.Writer
writer, err = os.Create(filepath.Join(datasetDir, fmt.Sprintf("%s.parquet", fileName.String())))
if err != nil {
return err
}
table := array.NewTableFromRecords(record.Schema(), []arrow.Record{record})

props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
arrProps := pqarrow.DefaultWriterProps()
return pqarrow.WriteTable(table, writer, 1000, props, arrProps)
}

func (s *OfflineStoreSink) Flush(featureServiceName string) error {
if s.datasetDir == "" {
return nil
}

errMsg := s.writeCallback(featureServiceName, s.datasetDir)
if errMsg != "" {
return errors.New(errMsg)
}

return s.cleanCurrentDatasetDir()
}
7 changes: 7 additions & 0 deletions go/internal/feast/server/logging/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,10 @@ func (s *LoggingService) GetOrCreateLogger(featureService *model.FeatureService)

return logger, nil
}

func (s *LoggingService) Stop() {
for _, logger := range s.loggers {
logger.Stop()
logger.WaitUntilStopped()
}
}
4 changes: 2 additions & 2 deletions go/internal/test/go_integration_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func SetupInitializedRepo(basePath string) error {
// var stderr bytes.Buffer
// var stdout bytes.Buffer
applyCommand.Dir = featureRepoPath
out, err := applyCommand.Output()
out, err := applyCommand.CombinedOutput()
if err != nil {
log.Println(string(out))
return err
Expand All @@ -152,7 +152,7 @@ func SetupInitializedRepo(basePath string) error {
materializeCommand := exec.Command("feast", "materialize-incremental", formattedTime)
materializeCommand.Env = os.Environ()
materializeCommand.Dir = featureRepoPath
out, err = materializeCommand.Output()
out, err = materializeCommand.CombinedOutput()
if err != nil {
log.Println(string(out))
return err
Expand Down
Loading

0 comments on commit ccad832

Please sign in to comment.