Skip to content

Commit

Permalink
🐛 Source slack: add channel_filter config var to improve sync perform…
Browse files Browse the repository at this point in the history
…ance (#11613)
  • Loading branch information
burmecia committed Apr 11, 2022
1 parent 06c902c commit 2b4ee63
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@
- name: Slack
sourceDefinitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
dockerRepository: airbyte/source-slack
dockerImageTag: 0.1.14
dockerImageTag: 0.1.15
documentationUrl: https://docs.airbyte.io/integrations/sources/slack
icon: slack.svg
sourceType: api
Expand Down
11 changes: 10 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7706,7 +7706,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-slack:0.1.14"
- dockerImage: "airbyte/source-slack:0.1.15"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/slack"
connectionSpecification:
Expand Down Expand Up @@ -7741,6 +7741,15 @@
description: "Whether to join all channels or to sync data only from channels\
\ the bot is already in. If false, you'll need to manually add the bot\
\ to all the channels from which you'd like to sync messages. "
channel_filter:
type: "array"
default: []
title: "Channel name filter"
description: "A channel name list (without leading '#' char) which limit\
\ the channels from which you'd like to sync. Empty list means no filter."
examples:
- "channel_one"
- "channel_two"
credentials:
title: "Authentication mechanism"
description: "Choose how to authenticate into Slack"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-slack/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.version=0.1.15
LABEL io.airbyte.name=airbyte/source-slack
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
},
"start_date": "2022-07-22T20:00:00Z",
"lookback_window": 2,
"join_channels": true
"join_channels": true,
"channel_filter": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
},
"start_date": "2021-07-22T20:00:00Z",
"lookback_window": 2,
"join_channels": true
"join_channels": true,
"channel_filter": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"api_token": "api_token",
"start_date": "2021-07-22T20:00:00Z",
"lookback_window": 2,
"join_channels": true
"join_channels": true,
"channel_filter": []
}
76 changes: 51 additions & 25 deletions airbyte-integrations/connectors/source-slack/source_slack/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from pendulum import DateTime, Period

Expand Down Expand Up @@ -77,9 +77,21 @@ def data_field(self) -> str:
"""The name of the field in the response which contains the data"""


class Channels(SlackStream):
class ChanneledStream(SlackStream, ABC):
"""Slack stream with channel filter"""

def __init__(self, channel_filter: List[str] = [], **kwargs):
self.channel_filter = channel_filter
super().__init__(**kwargs)


class Channels(ChanneledStream):
data_field = "channels"

@property
def use_cache(self) -> bool:
return True

def path(self, **kwargs) -> str:
return "conversations.list"

Expand All @@ -88,9 +100,23 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]:
params["types"] = "public_channel"
return params

def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[MutableMapping]:
json_response = response.json()
channels = json_response.get(self.data_field, [])
if self.channel_filter:
channels = [channel for channel in channels if channel["name"] in self.channel_filter]
yield from channels


class ChannelMembers(SlackStream):
class ChannelMembers(ChanneledStream):
data_field = "members"
primary_key = ["member_id", "channel_id"]

def path(self, **kwargs) -> str:
return "conversations.members"
Expand All @@ -106,7 +132,7 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str,
yield {"member_id": member_id, "channel_id": stream_slice["channel_id"]}

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
channels_stream = Channels(authenticator=self._session.auth)
channels_stream = Channels(authenticator=self._session.auth, channel_filter=self.channel_filter)
for channel_record in channels_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"channel_id": channel_record["id"]}

Expand All @@ -133,7 +159,7 @@ def chunk_date_range(start_date: DateTime, interval=pendulum.duration(days=1)) -
start_date = end_date


class IncrementalMessageStream(SlackStream, ABC):
class IncrementalMessageStream(ChanneledStream, ABC):
data_field = "messages"
cursor_field = "float_ts"
primary_key = ["channel_id", "ts"]
Expand Down Expand Up @@ -171,26 +197,17 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return current_stream_state


class ChannelMessages(IncrementalMessageStream):
class ChannelMessages(HttpSubStream, IncrementalMessageStream):
def path(self, **kwargs) -> str:
return "conversations.history"

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
stream_state = stream_state or {}
start_date = pendulum.from_timestamp(stream_state.get(self.cursor_field, self._start_ts))
for period in chunk_date_range(start_date):
yield {"oldest": period.start.timestamp(), "latest": period.end.timestamp()}

def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
# Channel is provided when reading threads
if "channel" in stream_slice:
yield from super().read_records(stream_slice=stream_slice, **kwargs)
else:
# if channel is not provided, then get channels and read accordingly
channels = Channels(authenticator=self._session.auth)
for channel_record in channels.read_records(sync_mode=SyncMode.full_refresh):
stream_slice["channel"] = channel_record["id"]
yield from super().read_records(stream_slice=stream_slice, **kwargs)
for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh):
channel = parent_slice["parent"]
for period in chunk_date_range(start_date):
yield {"channel": channel["id"], "oldest": period.start.timestamp(), "latest": period.end.timestamp()}


class Threads(IncrementalMessageStream):
Expand Down Expand Up @@ -220,7 +237,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
"""

stream_state = stream_state or {}
channels_stream = Channels(authenticator=self._session.auth)
channels_stream = Channels(authenticator=self._session.auth, channel_filter=self.channel_filter)

if self.cursor_field in stream_state:
# Since new messages can be posted to threads continuously after the parent message has been posted, we get messages from the latest date
Expand All @@ -231,7 +248,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
# If there is no state i.e: this is the first sync then there is no use for lookback, just get messages from the default start date
messages_start_date = pendulum.from_timestamp(self._start_ts)

messages_stream = ChannelMessages(authenticator=self._session.auth, default_start_date=messages_start_date)
messages_stream = ChannelMessages(parent=channels_stream, authenticator=self._session.auth, default_start_date=messages_start_date)

for message_chunk in messages_stream.stream_slices(stream_state={self.cursor_field: messages_start_date.timestamp()}):
self.logger.info(f"Syncing replies {message_chunk}")
Expand Down Expand Up @@ -321,12 +338,21 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = self._get_authenticator(config)
default_start_date = pendulum.parse(config["start_date"])
threads_lookback_window = pendulum.Duration(days=config["lookback_window"])
channel_filter = config["channel_filter"]

channels = Channels(authenticator=authenticator, channel_filter=channel_filter)
streams = [
Channels(authenticator=authenticator),
ChannelMembers(authenticator=authenticator),
ChannelMessages(authenticator=authenticator, default_start_date=default_start_date),
Threads(authenticator=authenticator, default_start_date=default_start_date, lookback_window=threads_lookback_window),
channels,
ChannelMembers(authenticator=authenticator, channel_filter=channel_filter),
ChannelMessages(
parent=channels, authenticator=authenticator, default_start_date=default_start_date, channel_filter=channel_filter
),
Threads(
authenticator=authenticator,
default_start_date=default_start_date,
lookback_window=threads_lookback_window,
channel_filter=channel_filter,
),
Users(authenticator=authenticator),
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
"title": "Join all channels",
"description": "Whether to join all channels or to sync data only from channels the bot is already in. If false, you'll need to manually add the bot to all the channels from which you'd like to sync messages. "
},
"channel_filter": {
"type": "array",
"default": [],
"title": "Channel name filter",
"description": "A channel name list (without leading '#' char) which limit the channels from which you'd like to sync. Empty list means no filter.",
"examples": ["channel_one", "channel_two"]
},
"credentials": {
"title": "Authentication mechanism",
"description": "Choose how to authenticate into Slack",
Expand Down
3 changes: 3 additions & 0 deletions docs/integrations/sources/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ This Source is capable of syncing the following core Streams:

The connector is restricted by normal Slack [requests limitation](https://api.slack.com/docs/rate-limits).

It is recommended to sync required channels only, this can be done by specifying config variable `channel_filter` in settings.

The Slack connector should not run into Slack API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully.

## Getting started
Expand Down Expand Up @@ -111,6 +113,7 @@ We recommend creating a restricted, read-only key specifically for Airbyte acces

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.15 | 2022-03-31 | [11613](https://github.com/airbytehq/airbyte/pull/11613) | Add 'channel_filter' config and improve performance |
| 0.1.14 | 2022-01-26 | [9575](https://github.com/airbytehq/airbyte/pull/9575) | Correct schema |
| 0.1.13 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
| 0.1.12 | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570) | Implement OAuth support with OAuth authenticator |
Expand Down

0 comments on commit 2b4ee63

Please sign in to comment.