-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Connector-builder server] Allow client to specify record limit and enforce max of 1000 #20575
Conversation
ea87097
to
4285b70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM but just a question on test/code structure
@@ -144,6 +144,9 @@ components: | |||
type: object | |||
description: The AirbyteStateMessage object to use as the starting state for this read | |||
# $ref: "#/components/schemas/AirbyteProtocol/definitions/AirbyteStateMessage" | |||
record_limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lmossman do we need to regenerate any typescript code? (if yes we should make it more obvious/automatic for anyone touching this code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generated typescript code is not committed to github, and since this is just adding a new optional parameter it won't break any of our code that consumes that generated typescript code. So this shouldn't require any frontend changes
else: | ||
record_limit = min(request_record_limit, max_record_limit) | ||
|
||
with patch.object(DefaultApiImpl, "MAX_RECORD_LIMIT", max_record_limit): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels like having to patch in this way breaks encapsulation. Does it make sense to make this an optional param on DefaultApiImpl
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, doesn't quite feel right to have MAX_RECORD_LIMIT
on DefaultApiImpl
. WDYT about moving it to a constant outside of the class? We could make it an optional param but it this specific variable doesn't feel to me like it's part of DefaultApiImpl
which gives me a slight preference for making it a constant. But I could be convinced either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just meant it would be better to patch it using depenency injection i.e: make this an init variable for DefaultApiImpl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay cool, I think that could be nice and tidy. Will make that change.
]) | ||
request_record_limit = 0 | ||
|
||
with patch.object(DefaultApiImpl, "_create_low_code_adapter", return_value=mock_source_adapter): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question, is this telling us that our code structure needs to change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular structural change you had in mind? I imagine this function was patched out of convenience (it's used throughout the test_default_api.py
file) - looks to me like there are multiple places downstream that we could patch instead if we wanted but that might be a little trickier. As a simple alternative, _create_low_code_adapter
is a staticmethod
and doesn't need to be a method on the class - it could be moved to its own function if we don't want to surface the details of DefaultApiImpl
in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one restructuring that could make sense would be to move some implementation out of DefaultApiImpl
, and have a separate class doing the business logic of fetching records. That separate class could take an adapter
object, which could be injected during tests. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a lighter-weight change than moving all of the record-fetching out of DefaultApiImpl
, injecting the adapter class into DefaultApiImpl.__init__
instead. This cleans up the patches.
a78e01e
to
3e19fc7
Compare
|
||
|
||
class DefaultApiImpl(DefaultApi): | ||
logger = logging.getLogger("airbyte.connector-builder") | ||
|
||
def __init__(self, adapter_cls: Type, max_record_limit: int = 1000): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we passing the class instead of the object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The adapter requires a manifest, which we don't have when DefaultApiImpl
is instantiated.
@@ -105,15 +109,22 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo | |||
Using the provided manifest and config, invokes a sync for the specified stream and returns groups of Airbyte messages | |||
that are produced during the read operation | |||
:param stream_read_request_body: Input parameters to trigger the read operation for a stream | |||
:param limit: The maximum number of records requested by the client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we explicitly define the range as [1, max_record_limit]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good call. Added.
@@ -144,6 +144,9 @@ components: | |||
type: object | |||
description: The AirbyteStateMessage object to use as the starting state for this read | |||
# $ref: "#/components/schemas/AirbyteProtocol/definitions/AirbyteStateMessage" | |||
record_limit: | |||
type: integer | |||
description: Number of records that will be returned to the client from the connector builder (max of 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the minimum and maximum could be defined here so the FE doesn't have to redefine it https://swagger.io/docs/specification/data-models/data-types/#numbers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@girarda I've addressed your comments if you want to take another look. |
@@ -27,7 +27,8 @@ | |||
class DefaultApiImpl(DefaultApi): | |||
logger = logging.getLogger("airbyte.connector-builder") | |||
|
|||
def __init__(self, max_record_limit: int = 1000): | |||
def __init__(self, adapter_cls: Type, max_record_limit: int = 1000): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are two problems with this type signature:
Type
is not very informative (is it the correct type? it seems imported from airbyte_cdk.models which doesn't seem right?)- as a result it's not clear what methods can be called on that type and with what params (e.g: how does the reader know the
adapter_cls
can be called with amanifest
argument?)
Example of a signature which solves these might be:
class CdkAdapter(abc):
<..describe methods we expect this class to have..>
class DefaultApiImpl(..):
def __init__(self, adapter_cls: Callable[[], CdkAdapter], ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good call @sherifnada - that was supposed to be lowercase type
. I like your suggestion to use an interface to be clearer about the type and went ahead and made that change.
5468b97
to
58893e1
Compare
f90f75d
to
51a1c08
Compare
What
In order to improve performance of test read requests in the connector builder UI, we want to avoid fetching more data from the source than is necessary to test the connection.
This update allows connector builder user to specify a
record_limit
that will limit the number of records returned to the client when they are testing the connector. Also enforces a maximum limit of 1000 records.This is the server portion of #19500.
How
stream/read
endpoint's to accept an optionalrecord_limit
key in the request body.MAX_RECORD_LIMIT
whose values is 1000; if the limit input by the user is not set or exceeds 1000,MAX_RECORD_LIMIT
will be enforced.messages
generator (which fetches records from the source) once<limit>
records have been seen.Recommended reading order
airbyte-connector-builder-server/connector_builder/impl/default_api.py
airbyte-connector-builder-server/src/main/openapi/openapi.yaml
🚨 User Impact 🚨
No breaking changes. The user will see
<limit>
records when they clickTest
to test the connector, rather than all messages that would have been fetched before this change.Tests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.