Skip to content

Commit

Permalink
Add Primary Key to CDK. (#3105)
Browse files Browse the repository at this point in the history
Hold off on publishing the version and updating various downstream consumers for now since a major refactor is happening.
  • Loading branch information
davinchia committed Apr 30, 2021
1 parent 7d0a794 commit fc00d36
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ def as_airbyte_stream(self) -> AirbyteStream:
stream.source_defined_cursor = self.source_defined_cursor
stream.supported_sync_modes.append(SyncMode.incremental)
stream.default_cursor_field = self._wrapped_cursor_field()

keys = self._wrapped_primary_key()
if len(keys) > 0:
stream.source_defined_primary_key = keys

return stream

@property
Expand Down Expand Up @@ -108,6 +113,33 @@ def source_defined_cursor(self) -> bool:
"""
return True

@property
@abstractmethod
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""
:return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
If the stream has no primary keys, return None.
"""

def _wrapped_primary_key(self) -> Optional[List[List[str]]]:
"""
:return: wrap the primary_key property in a list of list of strings required by the Airbyte Stream object.
"""
keys = self.primary_key
if isinstance(keys, str):
return [[keys]]
elif isinstance(keys, list):
wrapped_key = []
for component in keys:
if isinstance(component, str):
wrapped_key.append([component])
elif isinstance(component, list):
wrapped_key.append(component)
else:
raise ValueError("Element must be either list or str.")
else:
raise ValueError("Element must be either list or str.")

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, any]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@


class ExchangeRates(HttpStream):

date_field_name = "date"

# HttpStream related fields
Expand Down

0 comments on commit fc00d36

Please sign in to comment.