Skip to content

Commit

Permalink
refactor edge data sync (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
gertd committed May 16, 2024
1 parent 4bb7e74 commit 1121ad6
Show file tree
Hide file tree
Showing 15 changed files with 505 additions and 840 deletions.
22 changes: 22 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ builds:
- -X github.com/{{ .Env.ORG }}/{{ .Env.REPO }}/pkg/version.commit={{.ShortCommit}}
- -X github.com/{{ .Env.ORG }}/{{ .Env.REPO }}/pkg/version.date={{.Date}}
mod_timestamp: "{{ .CommitTimestamp }}"
- id: topaz-db
main: ./cmd/topaz-db
binary: topaz-db
goos:
- darwin
- linux
- windows
goarch:
- amd64
- arm64
env:
- CGO_ENABLED=0
ignore:
- goos: windows
goarch: arm64
ldflags:
- -s
- -w
- -X github.com/{{ .Env.ORG }}/{{ .Env.REPO }}/pkg/version.ver={{.Version}}
- -X github.com/{{ .Env.ORG }}/{{ .Env.REPO }}/pkg/version.commit={{.ShortCommit}}
- -X github.com/{{ .Env.ORG }}/{{ .Env.REPO }}/pkg/version.date={{.Date}}
mod_timestamp: "{{ .CommitTimestamp }}"

archives:
# https://goreleaser.com/customization/archive/
Expand Down
32 changes: 32 additions & 0 deletions cmd/topaz-db/cmd/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package cmd

import (
"github.com/aserto-dev/topaz/pkg/cli/clients"
)

type CLI struct {
Init InitCmd `cmd:"" help:"create new database file"`
Set SetCmd `cmd:"" help:"set manifest"`
Load LoadCmd `cmd:"" help:"load data"`
Sync SyncCmd `cmd:"" help:"sync data"`
}

type InitCmd struct {
DBFile string `arg:"" help:"db file name"`
}

type SetCmd struct {
DBFile string `arg:"" help:"db file name" type:"existingfile"`
Manifest string `arg:"" help:"manifest file path" type:"existingfile"`
}

type LoadCmd struct {
DBFile string `arg:"" help:"db file name" type:"existingfile"`
DataDir string `arg:"" help:"data file directory" type:"existingdir"`
}

type SyncCmd struct {
DBFile string `arg:"" help:"db file name" type:"existingfile"`
Mode []string `flag:"" short:"m" enum:"manifest,full,diff,watermark" required:"" help:"sync mode"`
clients.DirectoryConfig
}
42 changes: 42 additions & 0 deletions cmd/topaz-db/cmd/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cmd

import (
"context"
"fmt"
"io"
"os"
"time"

eds "github.com/aserto-dev/go-edge-ds"
"github.com/aserto-dev/go-edge-ds/pkg/directory"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

func (cmd *InitCmd) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if fi, err := os.Stat(cmd.DBFile); err == nil {
if fi.IsDir() {
return fmt.Errorf("%s is a directory", cmd.DBFile)
}
return fmt.Errorf("%s already exists", cmd.DBFile)
}

cfg := &directory.Config{
DBPath: cmd.DBFile,
RequestTimeout: 5 * time.Second,
}

logger := zerolog.New(io.Discard)

dir, err := eds.New(ctx, cfg, &logger)
if err != nil {
log.Error().Err(err).Str("db_file", cmd.DBFile).Msg("init_cmd")
}
defer dir.Close()

return nil
}
42 changes: 42 additions & 0 deletions cmd/topaz-db/cmd/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cmd

import (
"context"
"io"
"path/filepath"
"time"

"github.com/aserto-dev/clui"
dsc "github.com/aserto-dev/go-directory-cli/client"
"github.com/aserto-dev/go-edge-ds/pkg/directory"
"github.com/aserto-dev/topaz/cmd/topaz-db/pkg/inproc"

"github.com/rs/zerolog"
)

func (cmd *LoadCmd) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

cfg := &directory.Config{
DBPath: cmd.DBFile,
RequestTimeout: 5 * time.Second,
}

logger := zerolog.New(io.Discard)

conn, cleanup := inproc.NewServer(ctx, &logger, cfg)
defer cleanup()

dsClient, err := dsc.New(conn, clui.NewUI())
if err != nil {
return err
}

files, err := filepath.Glob(filepath.Join(cmd.DataDir, "*.json"))
if err != nil {
return err
}

return dsClient.V3.Import(ctx, files)
}
45 changes: 45 additions & 0 deletions cmd/topaz-db/cmd/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cmd

import (
"context"
"io"
"os"
"time"

"github.com/aserto-dev/clui"
dsc "github.com/aserto-dev/go-directory-cli/client"
"github.com/aserto-dev/go-edge-ds/pkg/directory"
"github.com/aserto-dev/topaz/cmd/topaz-db/pkg/inproc"

"github.com/rs/zerolog"
)

func (cmd *SetCmd) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

cfg := &directory.Config{
DBPath: cmd.DBFile,
RequestTimeout: 5 * time.Second,
}

logger := zerolog.New(io.Discard)

conn, cleanup := inproc.NewServer(ctx, &logger, cfg)
defer cleanup()

dsClient, err := dsc.New(conn, clui.NewUI())
if err != nil {
return err
}

r := os.Stdin
if cmd.Manifest != "" {
r, err = os.Open(cmd.Manifest)
if err != nil {
return err
}
}

return dsClient.V3.SetManifest(ctx, r)
}
53 changes: 53 additions & 0 deletions cmd/topaz-db/cmd/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package cmd

import (
"context"
"fmt"
"os"
"strings"
"time"

eds "github.com/aserto-dev/go-edge-ds"
"github.com/aserto-dev/go-edge-ds/pkg/datasync"
"github.com/aserto-dev/go-edge-ds/pkg/directory"
"github.com/aserto-dev/topaz/pkg/cli/clients"
"github.com/rs/zerolog"
)

func (cmd *SyncCmd) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

opts := []datasync.Option{}

// validate modes
for _, m := range cmd.Mode {
mode := datasync.StrToMode(strings.ToLower(m))
if mode == datasync.Unknown {
return fmt.Errorf("unknown mode: %s", m)
}
opts = append(opts, datasync.WithMode(mode))
}

// create client conn
conn, err := clients.NewDirectoryConn(ctx, &cmd.DirectoryConfig)
if err != nil {
return err
}
defer conn.Close()

cfg := &directory.Config{
DBPath: cmd.DBFile,
RequestTimeout: 5 * time.Second,
}

logger := zerolog.New(os.Stderr).Level(zerolog.InfoLevel)

dir, err := eds.New(ctx, cfg, &logger)
if err != nil {
return err
}
defer dir.Close()

return dir.DataSyncClient().Sync(ctx, conn, opts...)
}
45 changes: 45 additions & 0 deletions cmd/topaz-db/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"context"
"os"
"os/signal"
"strconv"

"github.com/alecthomas/kong"
"github.com/aserto-dev/topaz/cmd/topaz-db/cmd"
)

func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

cli := cmd.CLI{}

kongCtx := kong.Parse(&cli,
kong.Name("topaz-db"),
kong.Description("topaz database utility"),
kong.UsageOnError(),
kong.ConfigureHelp(kong.HelpOptions{
NoAppSummary: false,
Summary: false,
Compact: true,
Tree: false,
FlagsLast: true,
Indenter: kong.SpaceIndenter,
NoExpandSubcommands: true,
}),
kong.Vars{
"directory_svc": os.Getenv("TOPAZ_DIRECTORY_SVC"),
"directory_key": os.Getenv("TOPAZ_DIRECTORY_KEY"),
"directory_token": "",
"tenant_id": os.Getenv("ASERTO_TENANT_ID"),
"insecure": strconv.FormatBool(false),
"no_check": strconv.FormatBool(false),
},
)

if err := kongCtx.Run(ctx); err != nil {
kongCtx.FatalIfErrorf(err)
}
}
58 changes: 58 additions & 0 deletions cmd/topaz-db/pkg/inproc/inproc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package inproc

import (
"context"
"net"

dse "github.com/aserto-dev/go-directory/aserto/directory/exporter/v3"
dsi "github.com/aserto-dev/go-directory/aserto/directory/importer/v3"
dsm "github.com/aserto-dev/go-directory/aserto/directory/model/v3"
dsr "github.com/aserto-dev/go-directory/aserto/directory/reader/v3"
dsw "github.com/aserto-dev/go-directory/aserto/directory/writer/v3"

"github.com/aserto-dev/aserto-grpc/grpcutil/middlewares/gerr"
eds "github.com/aserto-dev/go-edge-ds"
"github.com/aserto-dev/go-edge-ds/pkg/directory"
"github.com/rs/zerolog"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)

func NewServer(ctx context.Context, logger *zerolog.Logger, cfg *directory.Config) (*grpc.ClientConn, func()) {
buffer := 1024 * 1024
listener := bufconn.Listen(buffer)

dsLogger := logger.With().Str("component", "ds").Logger()

inProcDirectory, err := eds.New(context.Background(), cfg, &dsLogger)
if err != nil {
logger.Error().Err(err).Msg("failed to start edge directory server")
}

errMiddleware := gerr.NewErrorMiddleware()
s := grpc.NewServer(
grpc.UnaryInterceptor(errMiddleware.Unary()),
grpc.StreamInterceptor(errMiddleware.Stream()),
)

dsm.RegisterModelServer(s, inProcDirectory.Model3())
dsr.RegisterReaderServer(s, inProcDirectory.Reader3())
dsw.RegisterWriterServer(s, inProcDirectory.Writer3())
dse.RegisterExporterServer(s, inProcDirectory.Exporter3())
dsi.RegisterImporterServer(s, inProcDirectory.Importer3())

go func() {
if err := s.Serve(listener); err != nil {
panic(err)
}
}()

// nolint: staticcheck // bufConn does not seem to work with the default DNS provided by grpc.NewClient.
conn, _ := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())

return conn, s.GracefulStop
}
Loading

0 comments on commit 1121ad6

Please sign in to comment.