Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update api for per stream #13835

Merged
merged 5 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 162 additions & 8 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,29 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/state/type/get:
post:
tags:
- connection
summary: Fetch the current type for a connection.
operationId: getStateType
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionStateType"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/connections/search:
post:
tags:
Expand Down Expand Up @@ -3585,7 +3608,6 @@ components:
type: string
description: Stream's name.
jsonSchema:
description: Stream schema using Json Schema specs.
$ref: "#/components/schemas/StreamJsonSchema"
supportedSyncModes:
type: array
Expand All @@ -3610,6 +3632,7 @@ components:
type: string
description: Optional Source-defined namespace. Airbyte streams from the same sources should have the same namespace. Currently only used by JDBC destinations to determine what schema to write to.
StreamJsonSchema:
description: Stream schema using Json Schema specs.
type: object
AirbyteStreamConfiguration:
description: the mutable part of the stream to configure the destination
Expand Down Expand Up @@ -3674,7 +3697,6 @@ components:
configId:
type: string
pagination:
type: object
$ref: "#/components/schemas/Pagination"
JobIdRequestBody:
type: object
Expand Down Expand Up @@ -3707,6 +3729,19 @@ components:
format: int64
status:
$ref: "#/components/schemas/JobStatus"
streams:
type: array
items:
$ref: "#/components/schemas/StreamDescriptor"
StreamDescriptor:
type: object
required:
- name
properties:
name:
type: string
namespace:
type: string
JobDebugRead:
type: object
required:
Expand Down Expand Up @@ -3980,15 +4015,134 @@ components:
$ref: "#/components/schemas/SynchronousJobRead"
ConnectionState:
type: object
description: Contains the state for a connection. The stateType field identifies what type of state it is. Only the field corresponding to that type will be set, the rest will be null. If stateType=not_set, then none of the fields will be set.
required:
- connectionId
- stateType
properties:
stateType:
$ref: "#/components/schemas/ConnectionStateType"
connectionId:
$ref: "#/components/schemas/ConnectionId"
state:
$ref: "#/components/schemas/ConnectionStateObject"
ConnectionStateObject:
state: # legacy state object
$ref: "#/components/schemas/StateBlob"
streamState:
type: array
items:
$ref: "#/components/schemas/StreamState"
globalState:
$ref: "#/components/schemas/AirbyteGlobalState"
StateBlob:
type: object
StreamState:
type: object
required:
- streamDescriptor
properties:
streamDescriptor:
$ref: "#/components/schemas/StreamDescriptor"
streamState:
$ref: "#/components/schemas/StateBlob"
AirbyteGlobalState:
cgardens marked this conversation as resolved.
Show resolved Hide resolved
type: object
required:
- streamStates
properties:
shared_state:
$ref: "#/components/schemas/StateBlob"
streamStates:
type: array
items:
$ref: "#/components/schemas/StreamState"
ConnectionStateType:
type: string
enum:
- global
- stream
- legacy
- not_set
CatalogDiff:
type: object
description: "Describes the difference between two Airbyte catalogs."
required:
- transformations
properties:
transformations:
description: list of stream transformations. order does not matter.
type: array
items:
$ref: "#/components/schemas/StreamTransform"
StreamTransform:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random thought - should the CatalogDiff contain other changes to streams besides just additions, removals, and field schema changes?

For example, should we be communicating changes to any of the following fields for a stream in the CatalogDiff?

  • supportedSyncModes
  • sourceDefinedCursor
  • defaultCursorField
  • sourceDefinedPrimaryKey
  • namespace

My guess is that we aren't putting these changes in the diff because the diff is just meant to communicate schema changes, but I wanted to double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point! in the future, yes. i don't think we need it at this state. that's why i structure this as a list with each item having a type instead of a list per type. i think the number of types will grow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! Yeah definitely not worth adding now if we don't need it at the moment, and this seems like an extensible approach that will allow us to add more stream transformation types in the future as we need them 👍

type: object
required:
- transformType
properties:
transformType:
type: string
enum:
- add_stream
- remove_stream
- update_stream
addStream:
$ref: "#/components/schemas/StreamDescriptor"
removeStream:
$ref: "#/components/schemas/StreamDescriptor"
updateStream:
type: array
description: list of field transformations. order does not matter.
items:
$ref: "#/components/schemas/FieldTransform"
FieldTransform:
type: object
description: "Describes the difference between two Streams."
required:
- transformType
properties:
transformType:
type: string
enum:
- add_field
- remove_field
- update_field_type
addField:
$ref: "#/components/schemas/FieldNameAndType"
removeField:
$ref: "#/components/schemas/FieldNameAndType"
updateFieldType:
$ref: "#/components/schemas/FieldTypeUpdate"
FieldNameAndType:
type: object
required:
- fieldName
- fieldType
properties:
fieldName:
type: array
items:
type: string
fieldType:
type: array
items:
type: string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the discussion I had with Tim, this should be a schema and be an object with no constraint (i.e. additionalProperties = true).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau aha! thanks for linking the comment in the google doc. i hadn't seen that. long term, i'm not sure that passing the schema around is a great pattern, but since that is the pattern already for how the FE handles the catalog, it makes sense, i'll switch to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is being changed to a schema, maybe we should also rename these properties from type -> schema, e.g. update_field_schema, fieldSchema, FieldSchemaUpdate, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed.

FieldTypeUpdate:
type: object
required:
- fieldName
- oldType
- newType
properties:
fieldName:
type: array
items:
type: string
oldType:
type: array
items:
type: string
newType:
type: array
items:
type: string
ActorDefinitionResourceRequirements:
description: actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.
type: object
Expand Down Expand Up @@ -4099,10 +4253,8 @@ components:
$ref: "#/components/schemas/DbMigrationRead"
# OAuth
OAuthConfiguration:
description: OAuth specific blob.
description: The values required to configure OAuth flows. The schema for this must match the `OAuthConfigSpecification.oauthUserInputFromConnectorConfigSpecification` schema.
OAuthInputConfiguration:
description: The values required to configure OAuth flows.
The schema for this must match the `OAuthConfigSpecification.oauthUserInputFromConnectorConfigSpecification` schema.
$ref: "#/components/schemas/OAuthConfiguration"
AdvancedAuth:
type: object
Expand Down Expand Up @@ -4372,6 +4524,8 @@ components:
catalogId:
type: string
format: uuid
catalogDiff:
$ref: "#/components/schemas/CatalogDiff"
WebBackendConnectionReadList:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.api.model.generated.ConnectionReadList;
import io.airbyte.api.model.generated.ConnectionSearch;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.CustomDestinationDefinitionCreate;
import io.airbyte.api.model.generated.CustomDestinationDefinitionUpdate;
Expand Down Expand Up @@ -727,6 +728,11 @@ public ConnectionState getState(final ConnectionIdRequestBody connectionIdReques
return execute(() -> schedulerHandler.getState(connectionIdRequestBody));
}

@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return null;
}
Comment on lines +732 to +734
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking - do we have an issue for implementing this endpoint specifically? Looking at the whimsical board, I only see issues for implementing the CatalogDiff behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was bundling it in with this issue #13648.


// SCHEDULER
@Override
public CheckConnectionRead executeSourceCheckConnection(final SourceCoreConfig sourceConfig) {
Expand Down Expand Up @@ -849,7 +855,7 @@ public boolean canImportDefinitons() {
return archiveHandler.canImportDefinitions();
}

private <T> T execute(final HandlerCall<T> call) {
private static <T> T execute(final HandlerCall<T> call) {
try {
return call.call();
} catch (final ConfigNotFoundException e) {
Expand Down
Loading