Skip to content

Commit

Permalink
Update schema (#13573)
Browse files Browse the repository at this point in the history
* Update schema

* generate python

* Stream as an object

* PR comments

* generate python

* rm unused required

* Description the state with no type

* Fix connector build

* Format

* format

Co-authored-by: cgardens <charles@airbyte.io>
  • Loading branch information
benmoriceau and cgardens committed Jun 10, 2022
1 parent 7134625 commit 704dd8b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 28 deletions.
32 changes: 24 additions & 8 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,23 @@ class Config:

class AirbyteStateType(Enum):
GLOBAL = "GLOBAL"
PER_STREAM = "PER_STREAM"
STREAM = "STREAM"
LEGACY = "LEGACY"


class StreamDescriptor(BaseModel):
class Config:
extra = Extra.allow

name: str
namespace: Optional[str] = None


class AirbyteStateBlob(BaseModel):
pass

class Config:
extra = Extra.forbid
extra = Extra.allow


class Level(Enum):
Expand Down Expand Up @@ -164,11 +173,18 @@ class OAuthConfigSpecification(BaseModel):

class AirbyteStreamState(BaseModel):
class Config:
extra = Extra.forbid
extra = Extra.allow

stream_descriptor: StreamDescriptor
stream_state: Optional[AirbyteStateBlob] = None


class AirbyteGlobalState(BaseModel):
class Config:
extra = Extra.allow

name: str = Field(..., description="Stream name")
state: AirbyteStateBlob
namespace: Optional[str] = Field(None, description="Optional Source-defined namespace.")
shared_state: Optional[AirbyteStateBlob] = None
stream_states: List[AirbyteStreamState]


class AirbyteTraceMessage(BaseModel):
Expand Down Expand Up @@ -263,9 +279,9 @@ class Config:
extra = Extra.allow

state_type: Optional[AirbyteStateType] = None
stream: Optional[AirbyteStreamState] = None
global_: Optional[AirbyteGlobalState] = Field(None, alias="global")
data: Optional[Dict[str, Any]] = Field(None, description="(Deprecated) the state data")
global_: Optional[AirbyteStateBlob] = Field(None, alias="global")
streams: Optional[List[AirbyteStreamState]] = None


class AirbyteCatalog(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,49 +74,65 @@ definitions:
properties:
state_type:
"$ref": "#/definitions/AirbyteStateType"
stream:
"$ref": "#/definitions/AirbyteStreamState"
global:
"$ref": "#/definitions/AirbyteGlobalState"
data:
description: "(Deprecated) the state data"
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
global:
"$ref": "#/definitions/AirbyteStateBlob"
streams:
type: array
items:
"$ref": "#/definitions/AirbyteStreamState"

AirbyteStateType:
type: string
description: >
The type of state the other fields represent.
If not set, the state data is interpreted as GLOBAL and should be read from the `data` field for backwards compatibility.
GLOBAL means that the state should be read from `global` and means that it represents the state for all the streams.
PER_STREAM means that the state should be read from `streams`. Each item in the list represents the state for the associated stream.
Is set to LEGACY, the state data should be read from the `data` field for backwards compatibility.
If not set, assume the state object is type LEGACY.
GLOBAL means that the state should be read from `global` and means that it represents the state for all the streams. It contains one shared
state and individual stream states.
PER_STREAM means that the state should be read from `stream`. The state present in this field correspond to the isolated state of the
associated stream description.
enum:
- GLOBAL
- PER_STREAM

- STREAM
- LEGACY
AirbyteStreamState:
type: object
description: "per stream state data"
additionalProperties: false
additionalProperties: true
required:
- stream_descriptor
properties:
stream_descriptor:
"$ref": "#/definitions/StreamDescriptor"
stream_state:
"$ref": "#/definitions/AirbyteStateBlob"
AirbyteGlobalState:
type: object
additionalProperties: true
required:
- stream_states
properties:
shared_state:
"$ref": "#/definitions/AirbyteStateBlob"
stream_states:
type: array
items:
"$ref": "#/definitions/AirbyteStreamState"
StreamDescriptor:
type: object
additionalProperties: true
required:
- name
- state
properties:
name:
description: "Stream name"
type: string
state:
"$ref": "#/definitions/AirbyteStateBlob"
namespace:
description: Optional Source-defined namespace.
type: string

AirbyteStateBlob:
type: object
description: "the state data"
additionalProperties: false
additionalProperties: true
existingJavaType: com.fasterxml.jackson.databind.JsonNode

AirbyteLogMessage:
Expand Down

0 comments on commit 704dd8b

Please sign in to comment.