Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Web API connector interfaces #30815

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 55 additions & 0 deletions sdks/go/pkg/beam/io/webapi/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package webapi supports reading from or writing to Web APIs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a work in progress, but the end goal isn't yet clear from this PR. That is the context of the following comment.

Right now, this is (at present) an alternative API wrapper for a DoFn, that has a dead letter queue. So, as presently provided, how different is the best future experience this API provides vs a user writing a DoFn directly? What does that future experience & benefits look like?

Remember, the point of an API wrapper like this is to make it easier for a user to do a complicated thing vs directly doing that complicated thing in the framework. Right now, it's about the same. But with throttling, retries, etc, or similar that are harder to implement right, but could be made convenient and consistent... that changes quickly.


Its design goals are to reduce the boilerplate of building Beam I/O connectors. See tracking issue:
https://github.com/apache/beam/issues/30423 and visit the Beam website
(https://beam.apache.org/documentation/io/built-in/webapis/) for details and examples.

# Basic usage

Basic usage requires providing a Caller to the Call func.

var _ webapi.Caller = &myCaller{}
type myCaller struct {
// Make configuration details public and tag with: `beam:"endpoint"` so that they are encoded/decoded by Beam.
Endpoint string `beam:"endpoint"`
}

// Call posts webapi.Request's JSON Payload in this example to myCaller's Endpoint.
// Returns a webapi.Response containing the HTTP response body.
func (caller *myCaller) Call(ctx context.Context, request *webapi.Request) (*webapi.Response, error) {
resp, err := http.Post(caller.Endpoint, "application/json", bytes.NewBuffer(request.Payload))
if err != nil {
return nil, err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return &webapi.Response{
Payload: body,
}, nil
}

To use the Caller in a Beam PTransform, simply provide it to the Call func which returns a tuple of PCollections,
on for successful responses and another for any errors.

requests := // PCollection of *webapi.Request.
responses, failures := Call(&myCaller{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things about this example:

  1. Not providing the scope parameter.
  2. Not providing the input pcollection parameter.
  3. Not namespacing it as webapi.Call which is how a user would actually see the code.
  4. It doesn't look like this "Call" function exists yet?

*/
package webapi
209 changes: 209 additions & 0 deletions sdks/go/pkg/beam/io/webapi/webapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webapi

import (
"bytes"
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"reflect"
)

func init() {
beam.RegisterType(reflect.TypeOf((*wrappedCallerOnlyUserType)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*Request)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*Response)(nil)).Elem())
register.Emitter1[*Response]()
register.DoFn3x1[context.Context, *Request, func(*Response), error](&callerFn{})
}

// Caller is an interface to a Web API endpoint.
type Caller interface {

// Call a Web API endpoint with a Request, yielding a Response.
Call(ctx context.Context, request *Request) (*Response, error)
}

// SetupTeardown interfaces methods called during a setup and teardown DoFn lifecycle.
// Some clients to Web APIs may need to establish resources or network connections prior to
// calling a Web API. This is a separate interface from a Caller since some Web API endpoints
// do not require this. An internal hybrid interface is hidden for user convenience so that
// a developer need only provide what makes sense for the API read or write task.
type SetupTeardown interface {

// Setup is called during a DoFn setup lifecycle method.
// Clients that need to instantiate resources would be performed within an implementation of this method.
Setup(ctx context.Context) error

// Teardown is called during a DoFn teardown lifecycle method.
// Clients that need to destroy or close resources would be performed within an implementation of this method.
Teardown(ctx context.Context) error
}

// Request holds the encoded payload of a Web API request.
// Caller implementations are responsible for decoding the Payload into the needed type required
// to fulfill the Web API request.
type Request struct {

// Payload is the encoded Web API request.
Payload []byte `beam:"payload"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this the approach used for the Java equivalent?

That is, the input is encoded, and then must be decoded by the caller?

While this is flexible, the big downside of this approach is that encoding/decoding is typically the most expensive operation, so forcing the behavior renders various optimizations like Fusion, ineffective without significant user side work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, we can make this generic instead of forcing inefficient user side encoding & decoding, it just requires the additional registration steps due to the current SDK's limitations WRT registring types.

eg.

The closest to best experience I think we could manage is:

func init() {
   webapi.RegisterCaller(&myCaller{})
}


...
responses, failures := webapi.Call(s, &myCaller{/*config*/}, requests)
...

That function with the most recent Go versions (1.21 I think?) should be able to infer the types.

Make the interface and implementation types generic, this avoids the extra wrapper types for Request and Response.

The callerFn type can then become generic on the user's caller type, the request type and the response type of the Caller interface methods, which is also pushed into the interface.

}

// Response holds the encoded payload of a Web API response.
// Caller implementations are responsible for encoding the Payload into the needed type required
// to represent the Web API response.
type Response struct {

// Payload is the encoded Web API response.
Payload []byte `beam:"payload"`
}

// newCallerFn instantiates a callerFn from a Caller. It wraps an encoded representation of the user defined Caller in a
// wrappedCallerOnlyUserType that serves as a noop SetupTeardown. This allows the user developer
// to only concern themselves with the smallest interface necessary to achieve their Web API read or write task
// while also maintaining a single code path for encoding and decoding callerFn's dependencies.
func newCallerFn(caller Caller) (*callerFn, error) {
t := reflect.TypeOf(caller)
enc := beam.NewElementEncoder(t)
buf := bytes.Buffer{}
if err := enc.Encode(caller, &buf); err != nil {
return nil, fmt.Errorf("Encode(%T) err %w", caller, err)
}
wrapped := &wrappedCallerOnlyUserType{
SerializedIFace: buf.Bytes(),
WrappedIFace: beam.EncodedType{
T: t,
},
}
return newCallerFnFromHybridIFace(wrapped)
}

// newCallerFnFromHybridIFace instantiates a callerFn from a hybrid Caller and SetupTeardown interface.
// It encodes the type using a beam.ElementEncoder, storing these artifacts in the callerFn instance.
func newCallerFnFromHybridIFace(caller callerSetupTeardown) (*callerFn, error) {
t := reflect.TypeOf(caller)
enc := beam.NewElementEncoder(t)
buf := bytes.Buffer{}
if err := enc.Encode(caller, &buf); err != nil {
return nil, fmt.Errorf("Encode(%T) err %w", caller, err)
}
return &callerFn{
SerializedIFace: buf.Bytes(),
WrappedIFace: beam.EncodedType{
T: t,
},
}, nil
}

// callerFn is a DoFn processes Request elements into Response elements by
// invoking a user defined Caller or callerSetupTeardown.
type callerFn struct {
SerializedIFace []byte `json:"serialized_i_face"`
WrappedIFace beam.EncodedType `json:"wrapped_i_face"`
iface callerSetupTeardown
}

// Setup decodes the user defined Caller or callerSetupTeardown.
// It forwards the ctx to successfully decoded SetupTeardown's Setup method.
func (fn *callerFn) Setup(ctx context.Context) error {
dec := beam.NewElementDecoder(fn.WrappedIFace.T)
buf := bytes.NewReader(fn.SerializedIFace)
iface, err := dec.Decode(buf)
if err != nil {
return fmt.Errorf("dec.Decode(%T) err: %w", fn.WrappedIFace.T, err)
}

cst, ok := iface.(callerSetupTeardown)
if !ok {
return fmt.Errorf("%T is not a callerSetupTeardown type", iface)
}

fn.iface = cst

return fn.iface.Setup(ctx)
}

// ProcessElement invokes a Caller's Call method with the Request and emitting the Response upon success.
func (fn *callerFn) ProcessElement(ctx context.Context, req *Request, emit func(*Response)) error {
if fn.iface == nil {
return fmt.Errorf("callerFn iface is nil")
}
resp, err := fn.iface.Call(ctx, req)
if err != nil {
return fmt.Errorf("error Call(%T) %w", fn.iface, err)
}
emit(resp)
return nil
}

// Teardown forwards the ctx to the decoded SetupTeardown's Teardown method.
func (fn *callerFn) Teardown(ctx context.Context) error {
if fn.iface != nil {
return fn.iface.Teardown(ctx)
}
return nil
}

// callerSetupTeardown is a hybrid Caller and SetupTeardown interface.
// It is hidden to allow the user developer to focus on Web API needs for situations that just need
// a Caller implementation or those that need both.
type callerSetupTeardown interface {
Caller
SetupTeardown
}

// wrappedCallerOnlyUserType is a callerSetupTeardown hybrid interface useful for settings where a user developer
// need only concern with using a Caller for their Web API read and write tasks. The design goals of this type
// are to enable a single code path for the callerFn when encoding and decoding its dependencies.
type wrappedCallerOnlyUserType struct {
SerializedIFace []byte `json:"serialized_i_face"`
WrappedIFace beam.EncodedType `json:"wrapped_i_face"`
iface Caller
}

// Call forwards the ctx and request to a user developer's Caller Call method.
func (w *wrappedCallerOnlyUserType) Call(ctx context.Context, request *Request) (*Response, error) {
if w.iface == nil {
return nil, fmt.Errorf("%T iface is nil", w)
}
return w.iface.Call(ctx, request)
}

// Setup decodes a user developer's Caller.
func (w *wrappedCallerOnlyUserType) Setup(_ context.Context) error {
dec := beam.NewElementDecoder(w.WrappedIFace.T)
buf := bytes.NewReader(w.SerializedIFace)
iface, err := dec.Decode(buf)
if err != nil {
return fmt.Errorf("dec.Decode(%T) err: %w", w.WrappedIFace.T, err)
}

cst, ok := iface.(Caller)
if !ok {
return fmt.Errorf("%T is not a Caller type", iface)
}

w.iface = cst

return nil
}

// Teardown is a noop method.
func (w *wrappedCallerOnlyUserType) Teardown(_ context.Context) error {
return nil
}
120 changes: 120 additions & 0 deletions sdks/go/pkg/beam/io/webapi/webapi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webapi

import (
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/google/go-cmp/cmp"
"reflect"
"testing"
)

func init() {
beam.RegisterType(reflect.TypeOf((*mockCaller)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*mockCallerSetupTeardown)(nil)).Elem())
}

func Test_newCallerFnFromHybridIFace(t *testing.T) {
ctx := context.Background()
caller := &mockCallerSetupTeardown{}
fn, err := newCallerFnFromHybridIFace(caller)
if err != nil {
t.Fatalf("error newCallerFnFromHybridIFace(%T): %v", caller, err)
}
if err = fn.Setup(ctx); err != nil {
t.Fatalf("error Setup() %v", err)
}

wantP := []byte("test payload")
if fn.iface == nil {
t.Fatalf("callerFn iface is nil")
}

resp, err := fn.iface.Call(ctx, &Request{
Payload: wantP,
})

if err != nil {
t.Fatalf("error callerFn iface Call() %v", err)
}

if diff := cmp.Diff(wantP, resp.Payload); diff != "" {
t.Fatalf("error Payload mismatch (-want +got):\n%s", diff)
}

if err = fn.Teardown(ctx); err != nil {
t.Fatalf("error Teardown() %v", err)
}
}

func Test_newCallerFn(t *testing.T) {
ctx := context.Background()
caller := &mockCaller{}
fn, err := newCallerFn(caller)
if err != nil {
t.Fatalf("error newCallerFn(%T): %v", caller, err)
}

if err = fn.Setup(ctx); err != nil {
t.Fatalf("error Setup() %v", err)
}

wantP := []byte("test payload")
if fn.iface == nil {
t.Fatalf("callerFn iface is nil")
}

resp, err := fn.iface.Call(ctx, &Request{
Payload: wantP,
})

if err != nil {
t.Fatalf("error callerFn iface Call() %v", err)
}

if diff := cmp.Diff(wantP, resp.Payload); diff != "" {
t.Fatalf("error Payload mismatch (-want +got):\n%s", diff)
}

if err = fn.Teardown(ctx); err != nil {
t.Fatalf("error Teardown() %v", err)
}
}

type mockCaller struct{}

func (m *mockCaller) Call(_ context.Context, request *Request) (*Response, error) {
return &Response{
Payload: request.Payload,
}, nil
}

type mockCallerSetupTeardown struct{}

func (m *mockCallerSetupTeardown) Call(ctx context.Context, request *Request) (*Response, error) {
return &Response{
Payload: request.Payload,
}, nil
}

func (m *mockCallerSetupTeardown) Setup(ctx context.Context) error {
return nil
}

func (m *mockCallerSetupTeardown) Teardown(ctx context.Context) error {
return nil
}