Skip to content

Commit

Permalink
update api for per stream (#13835)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Jun 17, 2022
1 parent e208d2c commit e89846c
Show file tree
Hide file tree
Showing 5 changed files with 1,445 additions and 19 deletions.
6 changes: 3 additions & 3 deletions airbyte-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ task generateApiServer(type: GenerateTask) {
'DestinationDefinitionSpecification': 'com.fasterxml.jackson.databind.JsonNode',
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'ConnectionStateObject' : 'com.fasterxml.jackson.databind.JsonNode',
'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode',
]

generateApiDocumentation = false
Expand Down Expand Up @@ -70,7 +70,7 @@ task generateApiClient(type: GenerateTask) {
'DestinationDefinitionSpecification': 'com.fasterxml.jackson.databind.JsonNode',
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'ConnectionStateObject' : 'com.fasterxml.jackson.databind.JsonNode',
'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode',
]

library = "native"
Expand Down Expand Up @@ -103,7 +103,7 @@ task generateApiDocs(type: GenerateTask) {
'DestinationDefinitionSpecification': 'com.fasterxml.jackson.databind.JsonNode',
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'ConnectionStateObject' : 'com.fasterxml.jackson.databind.JsonNode',
'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode',
]

generateApiDocumentation = false
Expand Down
168 changes: 160 additions & 8 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,29 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/state/type/get:
post:
tags:
- connection
summary: Fetch the current type for a connection.
operationId: getStateType
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionStateType"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/connections/search:
post:
tags:
Expand Down Expand Up @@ -3585,7 +3608,6 @@ components:
type: string
description: Stream's name.
jsonSchema:
description: Stream schema using Json Schema specs.
$ref: "#/components/schemas/StreamJsonSchema"
supportedSyncModes:
type: array
Expand All @@ -3610,6 +3632,7 @@ components:
type: string
description: Optional Source-defined namespace. Airbyte streams from the same sources should have the same namespace. Currently only used by JDBC destinations to determine what schema to write to.
StreamJsonSchema:
description: Stream schema using Json Schema specs.
type: object
AirbyteStreamConfiguration:
description: the mutable part of the stream to configure the destination
Expand Down Expand Up @@ -3674,7 +3697,6 @@ components:
configId:
type: string
pagination:
type: object
$ref: "#/components/schemas/Pagination"
JobIdRequestBody:
type: object
Expand Down Expand Up @@ -3707,6 +3729,19 @@ components:
format: int64
status:
$ref: "#/components/schemas/JobStatus"
streams:
type: array
items:
$ref: "#/components/schemas/StreamDescriptor"
StreamDescriptor:
type: object
required:
- name
properties:
name:
type: string
namespace:
type: string
JobDebugRead:
type: object
required:
Expand Down Expand Up @@ -3980,15 +4015,132 @@ components:
$ref: "#/components/schemas/SynchronousJobRead"
ConnectionState:
type: object
description: Contains the state for a connection. The stateType field identifies what type of state it is. Only the field corresponding to that type will be set, the rest will be null. If stateType=not_set, then none of the fields will be set.
required:
- connectionId
- stateType
properties:
stateType:
$ref: "#/components/schemas/ConnectionStateType"
connectionId:
$ref: "#/components/schemas/ConnectionId"
state:
$ref: "#/components/schemas/ConnectionStateObject"
ConnectionStateObject:
state: # legacy state object
$ref: "#/components/schemas/StateBlob"
streamState:
type: array
items:
$ref: "#/components/schemas/StreamState"
globalState:
$ref: "#/components/schemas/GlobalState"
StateBlob:
type: object
StreamState:
type: object
required:
- streamDescriptor
properties:
streamDescriptor:
$ref: "#/components/schemas/StreamDescriptor"
streamState:
$ref: "#/components/schemas/StateBlob"
GlobalState:
type: object
required:
- streamStates
properties:
shared_state:
$ref: "#/components/schemas/StateBlob"
streamStates:
type: array
items:
$ref: "#/components/schemas/StreamState"
ConnectionStateType:
type: string
enum:
- global
- stream
- legacy
- not_set
CatalogDiff:
type: object
description: Describes the difference between two Airbyte catalogs.
required:
- transforms
properties:
transforms:
description: list of stream transformations. order does not matter.
type: array
items:
$ref: "#/components/schemas/StreamTransform"
StreamTransform:
type: object
required:
- transformType
properties:
transformType:
type: string
enum:
- add_stream
- remove_stream
- update_stream
addStream:
$ref: "#/components/schemas/StreamDescriptor"
removeStream:
$ref: "#/components/schemas/StreamDescriptor"
updateStream:
type: array
description: list of field transformations. order does not matter.
items:
$ref: "#/components/schemas/FieldTransform"
FieldTransform:
type: object
description: "Describes the difference between two Streams."
required:
- transformType
properties:
transformType:
type: string
enum:
- add_field
- remove_field
- update_field_schema
addField:
$ref: "#/components/schemas/FieldNameAndSchema"
removeField:
$ref: "#/components/schemas/FieldNameAndSchema"
updateFieldSchema:
$ref: "#/components/schemas/FieldSchemaUpdate"
FieldNameAndSchema:
type: object
required:
- fieldName
- fieldSchema
properties:
fieldName:
type: array
items:
type: string
fieldSchema:
$ref: "#/components/schemas/FieldSchema"
FieldSchemaUpdate:
type: object
required:
- fieldName
- oldSchema
- newSchema
properties:
fieldName:
type: array
items:
type: string
oldSchema:
$ref: "#/components/schemas/FieldSchema"
newSchema:
$ref: "#/components/schemas/FieldSchema"
FieldSchema:
description: JSONSchema representation of the field
type: object
additionalProperties: true
ActorDefinitionResourceRequirements:
description: actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.
type: object
Expand Down Expand Up @@ -4099,10 +4251,8 @@ components:
$ref: "#/components/schemas/DbMigrationRead"
# OAuth
OAuthConfiguration:
description: OAuth specific blob.
description: The values required to configure OAuth flows. The schema for this must match the `OAuthConfigSpecification.oauthUserInputFromConnectorConfigSpecification` schema.
OAuthInputConfiguration:
description: The values required to configure OAuth flows.
The schema for this must match the `OAuthConfigSpecification.oauthUserInputFromConnectorConfigSpecification` schema.
$ref: "#/components/schemas/OAuthConfiguration"
AdvancedAuth:
type: object
Expand Down Expand Up @@ -4372,6 +4522,8 @@ components:
catalogId:
type: string
format: uuid
catalogDiff:
$ref: "#/components/schemas/CatalogDiff"
WebBackendConnectionReadList:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.api.model.generated.ConnectionReadList;
import io.airbyte.api.model.generated.ConnectionSearch;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.CustomDestinationDefinitionCreate;
import io.airbyte.api.model.generated.CustomDestinationDefinitionUpdate;
Expand Down Expand Up @@ -727,6 +728,11 @@ public ConnectionState getState(final ConnectionIdRequestBody connectionIdReques
return execute(() -> schedulerHandler.getState(connectionIdRequestBody));
}

@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return null;
}

// SCHEDULER
@Override
public CheckConnectionRead executeSourceCheckConnection(final SourceCoreConfig sourceConfig) {
Expand Down Expand Up @@ -849,7 +855,7 @@ public boolean canImportDefinitons() {
return archiveHandler.canImportDefinitions();
}

private <T> T execute(final HandlerCall<T> call) {
private static <T> T execute(final HandlerCall<T> call) {
try {
return call.call();
} catch (final ConfigNotFoundException e) {
Expand Down
4 changes: 2 additions & 2 deletions airbyte-webapp/src/core/request/AirbyteClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,13 @@ export interface ActorDefinitionResourceRequirements {
jobSpecific?: JobTypeResourceLimit[];
}

export interface ConnectionStateObject {
export interface StateBlob {
[key: string]: any;
}

export interface ConnectionState {
connectionId: ConnectionId;
state?: ConnectionStateObject;
state?: StateBlob;
}

export type CheckConnectionReadStatus = typeof CheckConnectionReadStatus[keyof typeof CheckConnectionReadStatus];
Expand Down
Loading

0 comments on commit e89846c

Please sign in to comment.