Skip to content

Commit

Permalink
Add configuration parameters for JSON maximum line size as well as fi…
Browse files Browse the repository at this point in the history
…le reader buffer size.
  • Loading branch information
cube2222 committed Oct 15, 2022
1 parent cc43f25 commit 6644557
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 30 deletions.
19 changes: 10 additions & 9 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ octosql "SELECT * FROM plugins.plugins"`,
defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop()
}
ctx := cmd.Context()
cfg, err := config.Read()
if err != nil {
return fmt.Errorf("couldn't read config: %w", err)
}
ctx = config.ContextWithConfig(ctx, cfg)

debug.SetGCPercent(1000)

logs.InitializeFileLogger()
Expand All @@ -94,11 +100,6 @@ octosql "SELECT * FROM plugins.plugins"`,
}
}()

cfg, err := config.Read()
if err != nil {
return fmt.Errorf("couldn't read config: %w", err)
}

installedPlugins, err := pluginManager.ListInstalledPlugins()
if err != nil {
return fmt.Errorf("couldn't list installed plugins: %w", err)
Expand Down Expand Up @@ -188,15 +189,15 @@ octosql "SELECT * FROM plugins.plugins"`,
if err != nil {
return fmt.Errorf("couldn't get file extension handlers: %w", err)
}
fileHandlers := map[string]func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error){
fileHandlers := map[string]func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error){
"csv": csv.Creator(','),
"json": json.Creator,
"lines": lines.Creator,
"parquet": parquet.Creator,
"tsv": csv.Creator('\t'),
}
for ext, pluginName := range fileExtensionHandlers {
fileHandlers[ext] = func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
fileHandlers[ext] = func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
db, err := databases[pluginName]()
if err != nil {
return nil, physical.Schema{}, fmt.Errorf("couldn't get plugin %s database for plugin extensions %s: %w", pluginName, ext, err)
Expand Down Expand Up @@ -513,13 +514,13 @@ func typecheckExpr(ctx context.Context, expr logical.Expression, env physical.En
}

type fileTypeDatabaseCreator struct {
creator func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error)
creator func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error)
}

func (f *fileTypeDatabaseCreator) ListTables(ctx context.Context) ([]string, error) {
return nil, nil
}

func (f *fileTypeDatabaseCreator) GetTable(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
return f.creator(name, options)
return f.creator(ctx, name, options)
}
46 changes: 37 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"context"
"fmt"
"io/ioutil"
"log"
Expand All @@ -23,6 +24,7 @@ var octosqlHomeDir = func() string {

type Config struct {
Databases []DatabaseConfig `yaml:"databases"`
Files FilesConfig `yaml:"files"`
}

type DatabaseConfig struct {
Expand All @@ -32,20 +34,34 @@ type DatabaseConfig struct {
Config yaml.Node `yaml:"config"`
}

type FilesConfig struct {
JSON JSONConfig `yaml:"json"`
BufferSizeBytes int `yaml:"buffer_size_bytes"`
}

type JSONConfig struct {
MaxLineSizeBytes int `yaml:"max_line_size_bytes"`
}

func Read() (*Config, error) {
var config Config
data, err := ioutil.ReadFile(filepath.Join(octosqlHomeDir, "octosql.yml"))
if err != nil {
if os.IsNotExist(err) {
return &Config{
Databases: nil,
}, nil
}
if err != nil && os.IsNotExist(err) {
config = Config{}
} else if err != nil {
return nil, fmt.Errorf("couldn't read config file: %w", err)
} else {
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("couldn't unmarshal yaml configuration: %w", err)
}
}

var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("couldn't unmarshal yaml configuration: %w", err)
// TODO: Refactor to a sensibly structured default value system.
if config.Files.BufferSizeBytes == 0 {
config.Files.BufferSizeBytes = 4096 * 1024
}
if config.Files.JSON.MaxLineSizeBytes == 0 {
config.Files.JSON.MaxLineSizeBytes = 1024 * 1024
}

return &config, nil
Expand Down Expand Up @@ -89,3 +105,15 @@ func (r *PluginReference) UnmarshalText(text []byte) error {
func (r *PluginReference) String() string {
return fmt.Sprintf("%s/%s", r.Repository, r.Name)
}

// TODO: Using a custom context everywhere would be better.

type contextKey struct{}

func ContextWithConfig(ctx context.Context, config *Config) context.Context {
return context.WithValue(ctx, contextKey{}, config)
}

func FromContext(ctx context.Context) *Config {
return ctx.Value(contextKey{}).(*Config)
}
6 changes: 3 additions & 3 deletions datasources/csv/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/cube2222/octosql/physical"
)

func Creator(separator rune) func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
return func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
f, err := files.OpenLocalFile(context.Background(), name, files.WithPreview())
func Creator(separator rune) func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
return func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
f, err := files.OpenLocalFile(ctx, name, files.WithPreview())
if err != nil {
return nil, physical.Schema{}, fmt.Errorf("couldn't open local file: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion datasources/json/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/valyala/fastjson"

"github.com/cube2222/octosql/config"
. "github.com/cube2222/octosql/execution"
"github.com/cube2222/octosql/execution/files"
"github.com/cube2222/octosql/octosql"
Expand All @@ -28,7 +29,7 @@ func (d *DatasourceExecuting) Run(ctx ExecutionContext, produce ProduceFn, metaS
defer f.Close()

sc := bufio.NewScanner(f)
sc.Buffer(nil, 1024*1024)
sc.Buffer(nil, config.FromContext(ctx).Files.JSON.MaxLineSizeBytes)

// All async processing in this function and all jobs created by it use this context.
// This means that returning from this function will properly clean up all async processors.
Expand Down
4 changes: 2 additions & 2 deletions datasources/json/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/cube2222/octosql/physical"
)

func Creator(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
f, err := files.OpenLocalFile(context.Background(), name, files.WithPreview())
func Creator(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
f, err := files.OpenLocalFile(ctx, name, files.WithPreview())
if err != nil {
return nil, physical.Schema{}, fmt.Errorf("couldn't open local file: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions datasources/lines/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/cube2222/octosql/physical"
)

func Creator(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
f, err := files.OpenLocalFile(context.Background(), name, files.WithPreview())
func Creator(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
f, err := files.OpenLocalFile(ctx, name, files.WithPreview())
if err != nil {
return nil, physical.Schema{}, fmt.Errorf("couldn't open local file: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion datasources/parquet/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/cube2222/octosql/physical"
)

func Creator(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
func Creator(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) {
f, err := os.Open(name)
if err != nil {
return nil, physical.Schema{}, fmt.Errorf("couldn't open file: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion execution/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"

"github.com/nxadm/tail"

"github.com/cube2222/octosql/config"
)

type customCloser struct {
Expand Down Expand Up @@ -113,7 +115,7 @@ func OpenLocalFile(ctx context.Context, path string, opts ...OpenFileOption) (io
return nil, fmt.Errorf("couldn't open file: %w", err)
}
return &customCloser{
Reader: bufio.NewReaderSize(f, 4096*1024),
Reader: bufio.NewReaderSize(f, config.FromContext(ctx).Files.BufferSizeBytes),
close: f.Close,
}, nil
} else {
Expand Down
4 changes: 2 additions & 2 deletions physical/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type DatasourceRepository struct {
// Bo inaczej będzie na start strasznie dużo rzeczy ładować niepotrzebnych dla wszystkich
// skonfigurowanych baz danych.
Databases map[string]func() (Database, error)
FileHandlers map[string]func(name string, options map[string]string) (DatasourceImplementation, Schema, error)
FileHandlers map[string]func(ctx context.Context, name string, options map[string]string) (DatasourceImplementation, Schema, error)
}

type Database interface {
Expand All @@ -78,7 +78,7 @@ func (dr *DatasourceRepository) GetDatasource(ctx context.Context, name string,
if index := strings.LastIndex(name, "."); index != -1 {
extension := name[index+1:]
if handler, ok := dr.FileHandlers[extension]; ok {
return handler(name, options)
return handler(ctx, name, options)
}
}

Expand Down

0 comments on commit 6644557

Please sign in to comment.