Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 194 additions & 22 deletions go-sdk/bundle/bundlev1/bundlev1server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"log/slog"
"os"
"runtime/debug"

"github.com/evanphx/go-hclog-slog/hclogslog"
"github.com/hashicorp/go-hclog"
Expand All @@ -32,9 +33,34 @@ import (
"github.com/apache/airflow/go-sdk/bundle/bundlev1/bundlev1server/impl"
"github.com/apache/airflow/go-sdk/pkg/bundles/shared"
"github.com/apache/airflow/go-sdk/pkg/config"
"github.com/apache/airflow/go-sdk/pkg/execution"
)

var versionInfo *bool = flag.Bool("bundle-metadata", false, "show the embedded bundle info")
// sdkModulePath is the import path of the SDK module. Used to identify the
// SDK version from the bundle binary's build info dependencies.
const sdkModulePath = "github.com/apache/airflow/go-sdk"

// Flags. The bundle-metadata flag is the existing ADR 0001 introspection
// hook; --comm and --logs select the coordinator-mode protocol added by
// ADR 0003. All three are read by Serve to choose a server mode below.
var (
versionInfo = flag.Bool("bundle-metadata", false, "show the embedded bundle info")
dumpSpec = flag.Bool(
"dump-bundle-spec",
false,
"print the bundle spec JSON (sdk + dags) used by airflow-go-pack and exit",
)
commAddr = flag.String(
"comm",
"",
"host:port of the supervisor's coordinator comm channel (selects coordinator mode)",
)
logsAddr = flag.String(
"logs",
"",
"host:port of the supervisor's coordinator logs channel (selects coordinator mode)",
)
)

// ServeOpt is an interface for defining options that can be passed to the
// Serve function. Each implementation modifies the ServeConfig being
Expand All @@ -52,41 +78,188 @@ func (s serveConfigFunc) ApplyServeOpt(in *ServerConfig) error {

type ServerConfig struct{}

// Serve is the entrypoint for your bundle, and sets it up ready for Airflow's Go Worker to use
// serveMode tags the protocol the binary will speak this run.
type serveMode int

const (
modePlugin serveMode = iota // go-plugin gRPC (existing Edge Worker path)
modeMetadataDump // --bundle-metadata: print BundleInfo JSON
modeSpecDump // --dump-bundle-spec: print bundle spec JSON (ADR 0002)
modeCoordinator // --comm/--logs: msgpack-over-IPC (ADR 0003)
modeUsageError // misuse: print usage and exit non-zero
)

// Serve is the entrypoint for your bundle, and sets it up ready for Airflow's
// Go Worker (go-plugin) or Python supervisor (coordinator protocol) to use.
//
// Zero or more options to configure the server may also be passed. There are no options yet, this is to allow
// future changes without breaking compatibility
// The mode is decided from CLI flags and process environment, so user code is
// always one line:
//
// func main() { bundlev1server.Serve(&myBundle{}) }
//
// Zero or more options to configure the server may also be passed. There are
// no options yet; the parameter exists to allow future additions without
// breaking compatibility.
func Serve(bundle bundlev1.BundleProvider, opts ...ServeOpt) error {
config.SetupViper("")

hcLogger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: os.Stderr,
JSONFormat: true,
IncludeLocation: true,
AdditionalLocationOffset: 3,
})

log := slog.New(hclogslog.Adapt(hcLogger))
slog.SetDefault(log)

flag.Parse()

serveConfig := &ServerConfig{}
for _, c := range opts {
c.ApplyServeOpt(serveConfig)
}

switch decideMode() {
case modeMetadataDump:
return dumpBundleMetadata(bundle)
case modeSpecDump:
return dumpBundleSpec(bundle)
case modeCoordinator:
// In coordinator mode the supervisor reads the logs channel for
// structured records, so configuring the hclog/stderr default
// logger here is unnecessary — execution.Serve installs its own
// slog handler against the logs socket before any user code runs.
return execution.Serve(bundle, *commAddr, *logsAddr)
case modePlugin:
installPluginLogger()
return servePlugin(bundle)
case modeUsageError:
fmt.Fprintln(os.Stderr, "error: --comm and --logs must be supplied together")
flag.CommandLine.SetOutput(os.Stderr)
flag.Usage()
os.Exit(2)
}
return nil
}

func decideMode() serveMode {
if *versionInfo {
meta := bundle.GetBundleVersion()
data, err := json.MarshalIndent(meta, "", " ")
if err != nil {
return err
return modeMetadataDump
}
if *dumpSpec {
return modeSpecDump
}
commSet := *commAddr != ""
logsSet := *logsAddr != ""
if commSet && logsSet {
return modeCoordinator
}
if commSet || logsSet {
// Partial use is a hard error per ADR 0003: both flags are
// required, otherwise the supervisor is misconfigured and the
// runtime should fail loudly rather than fall through to
// go-plugin (which would hang on the missing magic-cookie).
return modeUsageError
}
return modePlugin
}

func dumpBundleMetadata(bundle bundlev1.BundleProvider) error {
meta := bundle.GetBundleVersion()
data, err := json.MarshalIndent(meta, "", " ")
if err != nil {
return err
}
fmt.Println(string(data))
return nil
}

// bundleSpec is the wire shape printed by --dump-bundle-spec. The schema is
// stable per ADR 0002 and consumed by airflow-go-pack to populate the
// bundle's airflow-metadata.yaml at build time.
type bundleSpec struct {
FormatVersion string `json:"format_version"`
SDK bundleSpecSDK `json:"sdk"`
Dags map[string]bundleSpecDag `json:"dags"`
}

type bundleSpecSDK struct {
Language string `json:"language"`
Version string `json:"version"`
}

type bundleSpecDag struct {
Tasks []string `json:"tasks"`
}

// dumpBundleSpec runs the bundle's RegisterDags against an in-memory recorder
// and writes the bundle spec JSON to stdout. It must not start the gRPC
// server or contact any external services; the recorder is the only side
// effect.
func dumpBundleSpec(bundle bundlev1.BundleProvider) error {
reg := bundlev1.New()
if err := bundle.RegisterDags(reg); err != nil {
return fmt.Errorf("registering dags: %w", err)
}

enum, ok := reg.(bundlev1.EnumerableBundle)
if !ok {
return fmt.Errorf("registry does not implement EnumerableBundle")
}

spec := bundleSpec{
FormatVersion: "1.0",
SDK: bundleSpecSDK{
Language: "go",
Version: sdkVersion(),
},
Dags: make(map[string]bundleSpecDag),
}
for _, dag := range enum.OrderedDags() {
taskIDs := make([]string, 0, len(dag.Tasks))
for _, t := range dag.Tasks {
taskIDs = append(taskIDs, t.ID)
}
fmt.Println(string(data))
return nil
spec.Dags[dag.DagID] = bundleSpecDag{Tasks: taskIDs}
}

data, err := json.MarshalIndent(spec, "", " ")
if err != nil {
return err
}
fmt.Println(string(data))
return nil
}

// sdkVersion returns the version of the SDK module linked into this binary,
// derived from runtime/debug.ReadBuildInfo. Falls back to "(devel)" when
// build info is unavailable (e.g. tests, bundle binaries built from a local
// replace directive).
func sdkVersion() string {
info, ok := debug.ReadBuildInfo()
if !ok {
return "(devel)"
}
if info.Main.Path == sdkModulePath && info.Main.Version != "" {
return info.Main.Version
}
for _, dep := range info.Deps {
if dep.Path == sdkModulePath {
if dep.Replace != nil && dep.Replace.Version != "" {
return dep.Replace.Version
}
if dep.Version != "" {
return dep.Version
}
}
}
return "(devel)"
}

func installPluginLogger() {
hcLogger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: os.Stderr,
JSONFormat: true,
IncludeLocation: true,
AdditionalLocationOffset: 3,
})
log := slog.New(hclogslog.Adapt(hcLogger))
slog.SetDefault(log)
}

func servePlugin(bundle bundlev1.BundleProvider) error {
pluginConfig := &plugin.ServeConfig{
HandshakeConfig: shared.Handshake,
Plugins: plugin.PluginSet{
Expand All @@ -99,6 +272,5 @@ func Serve(bundle bundlev1.BundleProvider, opts ...ServeOpt) error {

// Likely never returns
plugin.Serve(pluginConfig)

return nil
}
Loading
Loading